Delay notification updates during getDifference and fix bugs.

GitOrigin-RevId: 05f5921a195a9b90b50773ce701ea115cc19e5c8
This commit is contained in:
levlam 2018-11-22 04:00:28 +03:00
parent 12e779bc99
commit 989b984455
4 changed files with 198 additions and 107 deletions

View File

@ -5024,7 +5024,7 @@ void MessagesManager::on_update_channel_too_long(tl_object_ptr<telegram_api::upd
int32 update_pts = (update->flags_ & UPDATE_CHANNEL_TO_LONG_FLAG_HAS_PTS) ? update->pts_ : 0;
if (d != nullptr) {
if (update_pts == 0 || update_pts > d->pts){
if (update_pts == 0 || update_pts > d->pts) {
get_channel_difference(dialog_id, d->pts, true, "on_update_channel_too_long 1");
}
} else {
@ -17408,6 +17408,12 @@ NotificationGroupId MessagesManager::get_dialog_message_notification_group_id(Di
d->message_notification_group_id = td_->notification_manager_->get_next_notification_group_id();
VLOG(notifications) << "Assign " << d->message_notification_group_id << " to " << d->dialog_id;
on_dialog_updated(d->dialog_id, "get_dialog_message_notification_group_id");
if (running_get_channel_difference(d->dialog_id) ||
get_channel_difference_to_logevent_id_.count(d->dialog_id) != 0) {
send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_chat_difference,
d->message_notification_group_id);
}
}
CHECK(d->message_notification_group_id.is_valid());
@ -22016,6 +22022,12 @@ void MessagesManager::do_get_channel_difference(DialogId dialog_id, int32 pts, b
return;
}
const Dialog *d = get_dialog(dialog_id);
if (d != nullptr && d->message_notification_group_id.is_valid()) {
send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_chat_difference,
d->message_notification_group_id);
}
int32 limit = td_->auth_manager_->is_bot() ? MAX_BOT_CHANNEL_DIFFERENCE : MAX_CHANNEL_DIFFERENCE;
if (pts <= 0) {
pts = 1;
@ -22353,6 +22365,11 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
LOG(INFO) << "Finish to apply postponed channel updates";
}
if (d != nullptr && d->message_notification_group_id.is_valid()) {
send_closure_later(td_->notification_manager_actor_, &NotificationManager::after_get_chat_difference,
d->message_notification_group_id);
}
if (postponed_chat_read_inbox_updates_.erase(dialog_id) > 0) {
send_update_chat_read_inbox(d, true, "after_get_channel_difference");
}

View File

@ -219,11 +219,9 @@ td_api::object_ptr<td_api::notification> NotificationManager::get_notification_o
}
void NotificationManager::add_update(int32 group_id, td_api::object_ptr<td_api::Update> update) {
// TODO delay updates while getDifference is running
// flush updates when getDifference or getChannelDiference is finished
VLOG(notifications) << "Add " << to_string(update);
pending_updates_[group_id].push_back(std::move(update));
if (!running_get_difference_) {
if (!running_get_difference_ && running_get_chat_difference_.count(group_id) == 0) {
flush_pending_updates_timeout_.add_timeout_in(group_id, MIN_UPDATE_DELAY_MS * 1e-3);
} else {
flush_pending_updates_timeout_.set_timeout_in(group_id, MAX_UPDATE_DELAY_MS * 1e-3);
@ -260,139 +258,159 @@ void NotificationManager::flush_pending_updates(int32 group_id) {
auto updates = std::move(it->second);
pending_updates_.erase(it);
std::unordered_map<int32, size_t> notification_pos;
size_t cur_pos = 1;
for (auto &update : updates) {
if (update->get_id() == td_api::updateNotificationGroup::ID) {
auto update_ptr = static_cast<td_api::updateNotificationGroup *>(update.get());
bool is_deletion = !update_ptr->removed_notification_ids_.empty() &&
(update_ptr->new_notifications_.empty() ||
update_ptr->new_notifications_.back()->id_ < update_ptr->removed_notification_ids_[0]);
bool is_changed = true;
while (is_changed) {
is_changed = false;
std::unordered_map<int32, size_t> notification_pos;
size_t cur_pos = 1;
for (auto &update : updates) {
if (update->get_id() == td_api::updateNotificationGroup::ID) {
auto update_ptr = static_cast<td_api::updateNotificationGroup *>(update.get());
bool is_deletion = !update_ptr->removed_notification_ids_.empty() &&
(update_ptr->new_notifications_.empty() ||
update_ptr->new_notifications_.back()->id_ < update_ptr->removed_notification_ids_[0]);
for (auto &notification : update_ptr->new_notifications_) {
auto notification_id = notification->id_;
auto &pos = notification_pos[notification_id];
CHECK(pos < cur_pos);
if (pos != 0) {
// this notification was deleted by previous update, but we can't remove the deletion or the addition
for (auto &notification : update_ptr->new_notifications_) {
auto notification_id = notification->id_;
auto &pos = notification_pos[notification_id];
CHECK(pos < cur_pos);
if (pos != 0) {
// this notification was deleted by previous update, but we can't remove the deletion or the addition
}
pos = cur_pos;
}
pos = cur_pos;
}
for (auto &notification_id : update_ptr->removed_notification_ids_) {
for (auto &notification_id : update_ptr->removed_notification_ids_) {
auto &pos = notification_pos[notification_id];
CHECK(pos < cur_pos);
if (pos == 0 || !is_deletion) {
pos = cur_pos;
} else {
// this notification was added by previous update, we can remove the addition and the deletion
auto &previous_update = updates[pos - 1];
CHECK(previous_update != nullptr);
if (previous_update->get_id() == td_api::updateNotificationGroup::ID) {
auto previous_update_ptr = static_cast<td_api::updateNotificationGroup *>(previous_update.get());
bool found = false;
size_t i = 0;
for (auto &notification : previous_update_ptr->new_notifications_) {
if (notification->id_ == notification_id) {
previous_update_ptr->new_notifications_.erase(previous_update_ptr->new_notifications_.begin() + i);
found = true;
break;
}
i++;
}
CHECK(found); // there should be no deletions without previous addition
if (previous_update_ptr->new_notifications_.empty()) {
if (previous_update_ptr->removed_notification_ids_.empty()) {
previous_update = nullptr;
} else {
previous_update_ptr->notification_settings_chat_id_ = previous_update_ptr->chat_id_;
}
}
} else {
auto previous_update_ptr = static_cast<td_api::updateNotification *>(previous_update.get());
CHECK(previous_update_ptr->notification_->id_ == notification_id);
previous_update = nullptr;
}
is_changed = true;
notification_id = 0;
pos = 0;
}
}
update_ptr->removed_notification_ids_.erase(
std::remove_if(update_ptr->removed_notification_ids_.begin(), update_ptr->removed_notification_ids_.end(),
[](auto &notification_id) { return notification_id == 0; }),
update_ptr->removed_notification_ids_.end());
if (update_ptr->removed_notification_ids_.empty() && update_ptr->new_notifications_.empty()) {
for (size_t i = cur_pos - 1; i > 0; i--) {
if (updates[i - 1] != nullptr && updates[i - 1]->get_id() == td_api::updateNotificationGroup::ID) {
auto previous_update_ptr = static_cast<td_api::updateNotificationGroup *>(updates[i - 1].get());
previous_update_ptr->total_count_ = update_ptr->total_count_;
update = nullptr;
break;
}
}
if (update != nullptr && update_ptr->total_count_ == 0) {
update = nullptr;
}
}
} else {
auto update_ptr = static_cast<td_api::updateNotification *>(update.get());
auto notification_id = update_ptr->notification_->id_;
auto &pos = notification_pos[notification_id];
CHECK(pos < cur_pos);
if (pos == 0 || !is_deletion) {
if (pos == 0) {
pos = cur_pos;
} else {
// this notification was added by previous update, we can remove the addition and the deletion
VLOG(notifications) << "Previous update with " << notification_id
<< " is not sent, so we can edit the notification in-place";
auto type = std::move(update_ptr->notification_->type_);
auto &previous_update = updates[pos - 1];
CHECK(previous_update != nullptr);
if (previous_update->get_id() == td_api::updateNotificationGroup::ID) {
auto previous_update_ptr = static_cast<td_api::updateNotificationGroup *>(previous_update.get());
bool found = false;
size_t i = 0;
for (auto &notification : previous_update_ptr->new_notifications_) {
if (notification->id_ == notification_id) {
previous_update_ptr->new_notifications_.erase(previous_update_ptr->new_notifications_.begin() + i);
notification->type_ = std::move(type);
found = true;
break;
}
i++;
}
CHECK(found); // there should be no deletions without previous addition
if (previous_update_ptr->new_notifications_.empty() &&
previous_update_ptr->removed_notification_ids_.empty()) {
previous_update = nullptr;
}
CHECK(found); // there should be no update about editing of deleted message
} else {
auto previous_update_ptr = static_cast<td_api::updateNotification *>(previous_update.get());
CHECK(previous_update_ptr->notification_->id_ == notification_id);
previous_update = nullptr;
previous_update_ptr->notification_->type_ = std::move(type);
}
notification_id = 0;
pos = 0;
is_changed = true;
update = nullptr;
}
}
cur_pos++;
}
update_ptr->removed_notification_ids_.erase(
std::remove_if(update_ptr->removed_notification_ids_.begin(), update_ptr->removed_notification_ids_.end(),
[](auto &notification_id) { return notification_id == 0; }),
update_ptr->removed_notification_ids_.end());
if (update_ptr->removed_notification_ids_.empty() && update_ptr->new_notifications_.empty()) {
update = nullptr;
}
} else {
auto update_ptr = static_cast<td_api::updateNotification *>(update.get());
auto notification_id = update_ptr->notification_->id_;
auto &pos = notification_pos[notification_id];
if (pos == 0) {
pos = cur_pos;
} else {
VLOG(notifications) << "Previous update with " << notification_id
<< " is not sent, so we can edit the notification in-place";
auto type = std::move(update_ptr->notification_->type_);
auto &previous_update = updates[pos - 1];
CHECK(previous_update != nullptr);
updates.erase(std::remove_if(updates.begin(), updates.end(), [](auto &update) { return update == nullptr; }),
updates.end());
if (updates.empty()) {
return;
}
if (previous_update->get_id() == td_api::updateNotificationGroup::ID) {
auto previous_update_ptr = static_cast<td_api::updateNotificationGroup *>(previous_update.get());
bool found = false;
for (auto &notification : previous_update_ptr->new_notifications_) {
if (notification->id_ == notification_id) {
notification->type_ = std::move(type);
found = true;
break;
}
size_t last_update_pos = 0;
for (size_t i = 1; i < updates.size(); i++) {
if (updates[last_update_pos]->get_id() == td_api::updateNotificationGroup::ID &&
updates[i]->get_id() == td_api::updateNotificationGroup::ID) {
auto last_update_ptr = static_cast<td_api::updateNotificationGroup *>(updates[last_update_pos].get());
auto update_ptr = static_cast<td_api::updateNotificationGroup *>(updates[i].get());
if (last_update_ptr->notification_settings_chat_id_ == update_ptr->notification_settings_chat_id_ &&
last_update_ptr->is_silent_ == update_ptr->is_silent_) {
if ((last_update_ptr->new_notifications_.empty() && update_ptr->new_notifications_.empty()) ||
(last_update_ptr->removed_notification_ids_.empty() && update_ptr->removed_notification_ids_.empty())) {
// combine updates
VLOG(notifications) << "Combine " << to_string(*last_update_ptr) << " and " << to_string(*update_ptr);
CHECK(last_update_ptr->notification_group_id_ == update_ptr->notification_group_id_);
CHECK(last_update_ptr->chat_id_ == update_ptr->chat_id_);
last_update_ptr->total_count_ = update_ptr->total_count_;
append(last_update_ptr->new_notifications_, std::move(update_ptr->new_notifications_));
append(last_update_ptr->removed_notification_ids_, std::move(update_ptr->removed_notification_ids_));
updates[i] = nullptr;
is_changed = true;
continue;
}
CHECK(found); // there should be no update about editing of deleted message
} else {
auto previous_update_ptr = static_cast<td_api::updateNotification *>(previous_update.get());
CHECK(previous_update_ptr->notification_->id_ == notification_id);
previous_update_ptr->notification_->type_ = std::move(type);
}
update = nullptr;
}
}
cur_pos++;
}
updates.erase(std::remove_if(updates.begin(), updates.end(), [](auto &update) { return update == nullptr; }),
updates.end());
if (updates.empty()) {
return;
}
size_t last_update_pos = 0;
for (size_t i = 1; i < updates.size(); i++) {
if (updates[last_update_pos]->get_id() == td_api::updateNotificationGroup::ID &&
updates[i]->get_id() == td_api::updateNotificationGroup::ID) {
auto last_update_ptr = static_cast<td_api::updateNotificationGroup *>(updates[last_update_pos].get());
auto update_ptr = static_cast<td_api::updateNotificationGroup *>(updates[i].get());
if (last_update_ptr->notification_settings_chat_id_ == update_ptr->notification_settings_chat_id_ &&
last_update_ptr->is_silent_ == update_ptr->is_silent_) {
if ((last_update_ptr->new_notifications_.empty() && update_ptr->new_notifications_.empty()) ||
(last_update_ptr->removed_notification_ids_.empty() && update_ptr->removed_notification_ids_.empty())) {
// combine updates
VLOG(notifications) << "Combine " << to_string(*last_update_ptr) << " and " << to_string(*update_ptr);
CHECK(last_update_ptr->notification_group_id_ == update_ptr->notification_group_id_);
CHECK(last_update_ptr->chat_id_ == update_ptr->chat_id_);
last_update_ptr->total_count_ = update_ptr->total_count_;
append(last_update_ptr->new_notifications_, std::move(update_ptr->new_notifications_));
append(last_update_ptr->removed_notification_ids_, std::move(update_ptr->removed_notification_ids_));
updates[i] = nullptr;
continue;
}
}
last_update_pos++;
if (last_update_pos != i) {
updates[last_update_pos] = std::move(updates[i]);
}
}
last_update_pos++;
if (last_update_pos != i) {
updates[last_update_pos] = std::move(updates[i]);
}
updates.resize(last_update_pos + 1);
}
updates.resize(last_update_pos + 1);
for (auto &update : updates) {
VLOG(notifications) << "Send " << to_string(update);
@ -600,6 +618,9 @@ void NotificationManager::edit_notification(NotificationGroupId group_id, Notifi
void NotificationManager::on_notifications_removed(
NotificationGroups::iterator &&group_it, vector<td_api::object_ptr<td_api::notification>> &&added_notifications,
vector<int32> &&removed_notification_ids) {
VLOG(notifications) << "In on_notifications_removed for " << group_it->first.group_id << " with "
<< added_notifications.size() << " added notifications and " << removed_notification_ids.size()
<< " removed notifications";
auto group_key = group_it->first;
auto final_group_key = group_key;
final_group_key.last_notification_date = 0;
@ -687,6 +708,7 @@ void NotificationManager::remove_notification(NotificationGroupId group_id, Noti
// notification is still pending, just delete it
group_it->second.pending_notifications.erase(it);
if (group_it->second.pending_notifications.empty()) {
group_it->second.pending_notifications_flush_time = 0;
flush_pending_notifications_timeout_.cancel_timeout(group_id.get());
}
return promise.set_value(Unit());
@ -747,7 +769,8 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id
return promise.set_value(Unit());
}
VLOG(notifications) << "Remove " << group_id << " up to " << max_notification_id << " or " << max_message_id;
VLOG(notifications) << "Remove " << group_id << " up to " << max_notification_id << " or " << max_message_id
<< " with new_total_count = " << new_total_count;
if (max_notification_id.is_valid()) {
// TODO remove notifications from database by max_notification_id, save that they are removed
@ -755,6 +778,7 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id
auto group_it = get_group(group_id);
if (group_it == groups_.end()) {
VLOG(notifications) << "Can't find " << group_id;
// TODO synchronously load the group
return promise.set_value(Unit());
}
@ -769,6 +793,7 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id
}
group_it->second.pending_notifications.erase(group_it->second.pending_notifications.begin(), pending_delete_end);
if (group_it->second.pending_notifications.empty()) {
group_it->second.pending_notifications_flush_time = 0;
flush_pending_notifications_timeout_.cancel_timeout(group_id.get());
}
if (new_total_count != -1) {
@ -812,6 +837,9 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id
if (new_total_count != -1 || !removed_notification_ids.empty()) {
on_notifications_removed(std::move(group_it), vector<td_api::object_ptr<td_api::notification>>(),
std::move(removed_notification_ids));
} else {
VLOG(notifications) << "Have new_total_count = " << new_total_count << " and " << removed_notification_ids.size()
<< " removed notifications";
}
promise.set_value(Unit());
}
@ -884,4 +912,38 @@ void NotificationManager::on_notification_default_delay_changed() {
VLOG(notifications) << "Set notification_default_delay_ms to " << notification_default_delay_ms_;
}
void NotificationManager::before_get_difference() {
running_get_difference_ = true;
}
void NotificationManager::after_get_difference() {
CHECK(running_get_difference_);
running_get_difference_ = false;
vector<int32> ready_group_ids;
for (auto &it : pending_updates_) {
if (running_get_chat_difference_.count(it.first) == 0) {
ready_group_ids.push_back(it.first);
}
}
for (auto group_id : ready_group_ids) {
flush_pending_updates_timeout_.cancel_timeout(group_id);
flush_pending_updates(group_id);
}
}
void NotificationManager::before_get_chat_difference(NotificationGroupId group_id) {
VLOG(notifications) << "Before get chat diference in " << group_id;
bool is_inserted = running_get_chat_difference_.insert(group_id.get()).second;
CHECK(is_inserted);
}
void NotificationManager::after_get_chat_difference(NotificationGroupId group_id) {
VLOG(notifications) << "After get chat diference in " << group_id;
auto erased_count = running_get_chat_difference_.erase(group_id.get());
if (erased_count == 1 && !running_get_difference_ && pending_updates_.count(group_id.get()) == 1) {
flush_pending_updates_timeout_.cancel_timeout(group_id.get());
flush_pending_updates(group_id.get());
}
}
} // namespace td

View File

@ -67,6 +67,14 @@ class NotificationManager : public Actor {
void on_notification_default_delay_changed();
void before_get_difference();
void after_get_difference();
void before_get_chat_difference(NotificationGroupId group_id);
void after_get_chat_difference(NotificationGroupId group_id);
private:
static constexpr int32 DEFAULT_GROUP_COUNT_MAX = 10;
static constexpr int32 DEFAULT_GROUP_SIZE_MAX = 10;
@ -183,6 +191,7 @@ class NotificationManager : public Actor {
int32 notification_default_delay_ms_ = DEFAULT_DEFAULT_DELAY_MS;
bool running_get_difference_ = false;
std::unordered_set<int32> running_get_chat_difference_;
NotificationGroups groups_;

View File

@ -25,6 +25,7 @@
#include "td/telegram/MessagesManager.h"
#include "td/telegram/net/DcOptions.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/NotificationManager.h"
#include "td/telegram/Payments.h"
#include "td/telegram/PrivacyManager.h"
#include "td/telegram/SecretChatId.h"
@ -262,6 +263,7 @@ void UpdatesManager::before_get_difference() {
td_->messages_manager_->before_get_difference();
send_closure(td_->secret_chats_manager_, &SecretChatsManager::before_get_difference, get_qts());
send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference);
}
Promise<> UpdatesManager::add_pts(int32 pts) {
@ -1088,6 +1090,7 @@ void UpdatesManager::after_get_difference() {
td_->inline_queries_manager_->after_get_difference();
td_->messages_manager_->after_get_difference();
send_closure_later(td_->notification_manager_actor_, &NotificationManager::after_get_difference);
send_closure(G()->state_manager(), &StateManager::on_synchronized, true);
set_state(State::Type::General);