Postpone updates in unknown channel instead of dropping.

This commit is contained in:
levlam 2021-07-29 21:43:03 +03:00
parent d4dc4f2a50
commit f0333aa578
2 changed files with 33 additions and 22 deletions

View File

@ -7260,6 +7260,12 @@ void MessagesManager::cancel_user_dialog_action(DialogId dialog_id, const Messag
m->content->get_type());
}
void MessagesManager::add_postponed_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update,
int32 new_pts, int32 pts_count, Promise<Unit> &&promise) {
postponed_channel_updates_[dialog_id].emplace(
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
}
void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update,
int32 new_pts, int32 pts_count, Promise<Unit> &&promise,
const char *source, bool is_postponed_update) {
@ -7307,10 +7313,10 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
}
if (new_pts > pts && pts != new_pts - pts_count) {
LOG(INFO) << "Found a gap in the " << dialog_id << " with pts = " << pts << ". new_pts = " << new_pts
LOG(INFO) << "Found a gap in unknown " << dialog_id << " with pts = " << pts << ". new_pts = " << new_pts
<< ", pts_count = " << pts_count << " in update from " << source;
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
get_channel_difference(dialog_id, pts, true, "add_pending_channel_update 3");
promise.set_value(Unit());
return;
}
@ -7375,8 +7381,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
if (running_get_channel_difference(dialog_id)) {
LOG(INFO) << "Postpone channel update, because getChannelDifference is run";
d->postponed_channel_updates.emplace(new_pts,
PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
return;
}
@ -7385,9 +7390,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
<< ", pts_count = " << pts_count << " in update from " << source;
if (d->was_opened || td_->contacts_manager_->get_channel_status(channel_id).is_member() ||
is_dialog_sponsored(d)) {
d->postponed_channel_updates.emplace(
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
get_channel_difference(dialog_id, old_pts, true, "add_pending_channel_update pts mismatch");
} else {
promise.set_value(Unit());
@ -35864,17 +35867,19 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
if (d != nullptr) {
d->is_channel_difference_finished = true;
bool have_access = have_input_peer(dialog_id, AccessRights::Read);
if (!d->postponed_channel_updates.empty()) {
LOG(INFO) << "Begin to apply postponed channel updates";
while (!d->postponed_channel_updates.empty()) {
auto it = d->postponed_channel_updates.begin();
auto updates_it = postponed_channel_updates_.find(dialog_id);
if (updates_it != postponed_channel_updates_.end()) {
auto &updates = updates_it->second;
LOG(INFO) << "Begin to apply " << updates.size() << " postponed channel updates";
while (!updates.empty()) {
auto it = updates.begin();
auto update = std::move(it->second.update);
auto update_pts = it->second.pts;
auto update_pts_count = it->second.pts_count;
auto promise = std::move(it->second.promise);
d->postponed_channel_updates.erase(it);
updates.erase(it);
auto old_size = d->postponed_channel_updates.size();
auto old_size = updates.size();
auto update_id = update->get_id();
if (have_access) {
add_pending_channel_update(dialog_id, std::move(update), update_pts, update_pts_count, std::move(promise),
@ -35882,7 +35887,7 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
} else {
promise.set_value(Unit());
}
if (d->postponed_channel_updates.size() != old_size || running_get_channel_difference(dialog_id)) {
if (updates.size() != old_size || running_get_channel_difference(dialog_id)) {
if (success && update_pts - 10000 < d->pts && update_pts_count == 1) {
// if getChannelDifference was successful and update pts is near channel pts,
// we hope that the update eventually can be applied
@ -35893,10 +35898,10 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
<< " with pts " << d->pts << ", update pts is " << update_pts << ", update pts count is "
<< update_pts_count;
vector<Promise<Unit>> update_promises;
for (auto &postponed_update : d->postponed_channel_updates) {
for (auto &postponed_update : updates) {
update_promises.push_back(std::move(postponed_update.second.promise));
}
d->postponed_channel_updates.clear();
updates.clear();
for (auto &update_promise : update_promises) {
update_promise.set_value(Unit());
}
@ -35904,6 +35909,9 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
break;
}
}
if (updates.empty()) {
postponed_channel_updates_.erase(updates_it);
}
LOG(INFO) << "Finish to apply postponed channel updates";
}

View File

@ -1240,12 +1240,11 @@ class MessagesManager final : public Actor {
bool has_unload_timeout = false;
bool is_channel_difference_finished = false;
int32 pts = 0; // for channels only
std::multimap<int32, PendingPtsUpdate> postponed_channel_updates; // for channels only
int32 pending_read_channel_inbox_pts = 0; // for channels only
MessageId pending_read_channel_inbox_max_message_id; // for channels only
int32 pending_read_channel_inbox_server_unread_count = 0; // for channels only
std::unordered_map<int64, MessageId> random_id_to_message_id; // for secret chats only
int32 pts = 0; // for channels only
int32 pending_read_channel_inbox_pts = 0; // for channels only
MessageId pending_read_channel_inbox_max_message_id; // for channels only
int32 pending_read_channel_inbox_server_unread_count = 0; // for channels only
std::unordered_map<int64, MessageId> random_id_to_message_id; // for secret chats only
MessageId last_assigned_message_id; // identifier of the last local or yet unsent message, assigned after
// application start, used to guarantee that all assigned message identifiers
@ -1788,6 +1787,9 @@ class MessagesManager final : public Actor {
bool can_set_game_score(DialogId dialog_id, const Message *m) const;
void add_postponed_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 pts_count, Promise<Unit> &&promise);
void process_channel_update(tl_object_ptr<telegram_api::Update> &&update);
void on_message_edited(FullMessageId full_message_id, int32 pts);
@ -3239,6 +3241,7 @@ class MessagesManager final : public Actor {
std::unordered_map<DialogId, string, DialogIdHash> active_get_channel_differencies_;
std::unordered_map<DialogId, uint64, DialogIdHash> get_channel_difference_to_log_event_id_;
std::unordered_map<DialogId, int32, DialogIdHash> channel_get_difference_retry_timeouts_;
std::unordered_map<DialogId, std::multimap<int32, PendingPtsUpdate>, DialogIdHash> postponed_channel_updates_;
MultiTimeout channel_get_difference_timeout_{"ChannelGetDifferenceTimeout"};
MultiTimeout channel_get_difference_retry_timeout_{"ChannelGetDifferenceRetryTimeout"};