diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 0c748ea2..0b2a5d59 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -5024,7 +5024,7 @@ void MessagesManager::on_update_channel_too_long(tl_object_ptrflags_ & UPDATE_CHANNEL_TO_LONG_FLAG_HAS_PTS) ? update->pts_ : 0; if (d != nullptr) { - if (update_pts == 0 || update_pts > d->pts){ + if (update_pts == 0 || update_pts > d->pts) { get_channel_difference(dialog_id, d->pts, true, "on_update_channel_too_long 1"); } } else { @@ -17408,6 +17408,12 @@ NotificationGroupId MessagesManager::get_dialog_message_notification_group_id(Di d->message_notification_group_id = td_->notification_manager_->get_next_notification_group_id(); VLOG(notifications) << "Assign " << d->message_notification_group_id << " to " << d->dialog_id; on_dialog_updated(d->dialog_id, "get_dialog_message_notification_group_id"); + + if (running_get_channel_difference(d->dialog_id) || + get_channel_difference_to_logevent_id_.count(d->dialog_id) != 0) { + send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_chat_difference, + d->message_notification_group_id); + } } CHECK(d->message_notification_group_id.is_valid()); @@ -22016,6 +22022,12 @@ void MessagesManager::do_get_channel_difference(DialogId dialog_id, int32 pts, b return; } + const Dialog *d = get_dialog(dialog_id); + if (d != nullptr && d->message_notification_group_id.is_valid()) { + send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_chat_difference, + d->message_notification_group_id); + } + int32 limit = td_->auth_manager_->is_bot() ? MAX_BOT_CHANNEL_DIFFERENCE : MAX_CHANNEL_DIFFERENCE; if (pts <= 0) { pts = 1; @@ -22353,6 +22365,11 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ LOG(INFO) << "Finish to apply postponed channel updates"; } + if (d != nullptr && d->message_notification_group_id.is_valid()) { + send_closure_later(td_->notification_manager_actor_, &NotificationManager::after_get_chat_difference, + d->message_notification_group_id); + } + if (postponed_chat_read_inbox_updates_.erase(dialog_id) > 0) { send_update_chat_read_inbox(d, true, "after_get_channel_difference"); } diff --git a/td/telegram/NotificationManager.cpp b/td/telegram/NotificationManager.cpp index d182adb7..600e6632 100644 --- a/td/telegram/NotificationManager.cpp +++ b/td/telegram/NotificationManager.cpp @@ -219,11 +219,9 @@ td_api::object_ptr NotificationManager::get_notification_o } void NotificationManager::add_update(int32 group_id, td_api::object_ptr update) { - // TODO delay updates while getDifference is running - // flush updates when getDifference or getChannelDiference is finished VLOG(notifications) << "Add " << to_string(update); pending_updates_[group_id].push_back(std::move(update)); - if (!running_get_difference_) { + if (!running_get_difference_ && running_get_chat_difference_.count(group_id) == 0) { flush_pending_updates_timeout_.add_timeout_in(group_id, MIN_UPDATE_DELAY_MS * 1e-3); } else { flush_pending_updates_timeout_.set_timeout_in(group_id, MAX_UPDATE_DELAY_MS * 1e-3); @@ -260,139 +258,159 @@ void NotificationManager::flush_pending_updates(int32 group_id) { auto updates = std::move(it->second); pending_updates_.erase(it); - std::unordered_map notification_pos; - size_t cur_pos = 1; - for (auto &update : updates) { - if (update->get_id() == td_api::updateNotificationGroup::ID) { - auto update_ptr = static_cast(update.get()); - bool is_deletion = !update_ptr->removed_notification_ids_.empty() && - (update_ptr->new_notifications_.empty() || - update_ptr->new_notifications_.back()->id_ < update_ptr->removed_notification_ids_[0]); + bool is_changed = true; + while (is_changed) { + is_changed = false; + std::unordered_map notification_pos; + size_t cur_pos = 1; + for (auto &update : updates) { + if (update->get_id() == td_api::updateNotificationGroup::ID) { + auto update_ptr = static_cast(update.get()); + bool is_deletion = !update_ptr->removed_notification_ids_.empty() && + (update_ptr->new_notifications_.empty() || + update_ptr->new_notifications_.back()->id_ < update_ptr->removed_notification_ids_[0]); - for (auto ¬ification : update_ptr->new_notifications_) { - auto notification_id = notification->id_; - auto &pos = notification_pos[notification_id]; - CHECK(pos < cur_pos); - if (pos != 0) { - // this notification was deleted by previous update, but we can't remove the deletion or the addition + for (auto ¬ification : update_ptr->new_notifications_) { + auto notification_id = notification->id_; + auto &pos = notification_pos[notification_id]; + CHECK(pos < cur_pos); + if (pos != 0) { + // this notification was deleted by previous update, but we can't remove the deletion or the addition + } + pos = cur_pos; } - pos = cur_pos; - } - for (auto ¬ification_id : update_ptr->removed_notification_ids_) { + for (auto ¬ification_id : update_ptr->removed_notification_ids_) { + auto &pos = notification_pos[notification_id]; + CHECK(pos < cur_pos); + if (pos == 0 || !is_deletion) { + pos = cur_pos; + } else { + // this notification was added by previous update, we can remove the addition and the deletion + auto &previous_update = updates[pos - 1]; + CHECK(previous_update != nullptr); + + if (previous_update->get_id() == td_api::updateNotificationGroup::ID) { + auto previous_update_ptr = static_cast(previous_update.get()); + bool found = false; + size_t i = 0; + for (auto ¬ification : previous_update_ptr->new_notifications_) { + if (notification->id_ == notification_id) { + previous_update_ptr->new_notifications_.erase(previous_update_ptr->new_notifications_.begin() + i); + found = true; + break; + } + i++; + } + CHECK(found); // there should be no deletions without previous addition + if (previous_update_ptr->new_notifications_.empty()) { + if (previous_update_ptr->removed_notification_ids_.empty()) { + previous_update = nullptr; + } else { + previous_update_ptr->notification_settings_chat_id_ = previous_update_ptr->chat_id_; + } + } + } else { + auto previous_update_ptr = static_cast(previous_update.get()); + CHECK(previous_update_ptr->notification_->id_ == notification_id); + previous_update = nullptr; + } + + is_changed = true; + notification_id = 0; + pos = 0; + } + } + + update_ptr->removed_notification_ids_.erase( + std::remove_if(update_ptr->removed_notification_ids_.begin(), update_ptr->removed_notification_ids_.end(), + [](auto ¬ification_id) { return notification_id == 0; }), + update_ptr->removed_notification_ids_.end()); + if (update_ptr->removed_notification_ids_.empty() && update_ptr->new_notifications_.empty()) { + for (size_t i = cur_pos - 1; i > 0; i--) { + if (updates[i - 1] != nullptr && updates[i - 1]->get_id() == td_api::updateNotificationGroup::ID) { + auto previous_update_ptr = static_cast(updates[i - 1].get()); + previous_update_ptr->total_count_ = update_ptr->total_count_; + update = nullptr; + break; + } + } + if (update != nullptr && update_ptr->total_count_ == 0) { + update = nullptr; + } + } + } else { + auto update_ptr = static_cast(update.get()); + auto notification_id = update_ptr->notification_->id_; auto &pos = notification_pos[notification_id]; - CHECK(pos < cur_pos); - if (pos == 0 || !is_deletion) { + if (pos == 0) { pos = cur_pos; } else { - // this notification was added by previous update, we can remove the addition and the deletion + VLOG(notifications) << "Previous update with " << notification_id + << " is not sent, so we can edit the notification in-place"; + auto type = std::move(update_ptr->notification_->type_); auto &previous_update = updates[pos - 1]; CHECK(previous_update != nullptr); if (previous_update->get_id() == td_api::updateNotificationGroup::ID) { auto previous_update_ptr = static_cast(previous_update.get()); bool found = false; - size_t i = 0; for (auto ¬ification : previous_update_ptr->new_notifications_) { if (notification->id_ == notification_id) { - previous_update_ptr->new_notifications_.erase(previous_update_ptr->new_notifications_.begin() + i); + notification->type_ = std::move(type); found = true; break; } - i++; - } - CHECK(found); // there should be no deletions without previous addition - if (previous_update_ptr->new_notifications_.empty() && - previous_update_ptr->removed_notification_ids_.empty()) { - previous_update = nullptr; } + CHECK(found); // there should be no update about editing of deleted message } else { auto previous_update_ptr = static_cast(previous_update.get()); CHECK(previous_update_ptr->notification_->id_ == notification_id); - previous_update = nullptr; + previous_update_ptr->notification_->type_ = std::move(type); } - notification_id = 0; - pos = 0; + is_changed = true; + update = nullptr; } } + cur_pos++; + } - update_ptr->removed_notification_ids_.erase( - std::remove_if(update_ptr->removed_notification_ids_.begin(), update_ptr->removed_notification_ids_.end(), - [](auto ¬ification_id) { return notification_id == 0; }), - update_ptr->removed_notification_ids_.end()); - if (update_ptr->removed_notification_ids_.empty() && update_ptr->new_notifications_.empty()) { - update = nullptr; - } - } else { - auto update_ptr = static_cast(update.get()); - auto notification_id = update_ptr->notification_->id_; - auto &pos = notification_pos[notification_id]; - if (pos == 0) { - pos = cur_pos; - } else { - VLOG(notifications) << "Previous update with " << notification_id - << " is not sent, so we can edit the notification in-place"; - auto type = std::move(update_ptr->notification_->type_); - auto &previous_update = updates[pos - 1]; - CHECK(previous_update != nullptr); + updates.erase(std::remove_if(updates.begin(), updates.end(), [](auto &update) { return update == nullptr; }), + updates.end()); + if (updates.empty()) { + return; + } - if (previous_update->get_id() == td_api::updateNotificationGroup::ID) { - auto previous_update_ptr = static_cast(previous_update.get()); - bool found = false; - for (auto ¬ification : previous_update_ptr->new_notifications_) { - if (notification->id_ == notification_id) { - notification->type_ = std::move(type); - found = true; - break; - } + size_t last_update_pos = 0; + for (size_t i = 1; i < updates.size(); i++) { + if (updates[last_update_pos]->get_id() == td_api::updateNotificationGroup::ID && + updates[i]->get_id() == td_api::updateNotificationGroup::ID) { + auto last_update_ptr = static_cast(updates[last_update_pos].get()); + auto update_ptr = static_cast(updates[i].get()); + if (last_update_ptr->notification_settings_chat_id_ == update_ptr->notification_settings_chat_id_ && + last_update_ptr->is_silent_ == update_ptr->is_silent_) { + if ((last_update_ptr->new_notifications_.empty() && update_ptr->new_notifications_.empty()) || + (last_update_ptr->removed_notification_ids_.empty() && update_ptr->removed_notification_ids_.empty())) { + // combine updates + VLOG(notifications) << "Combine " << to_string(*last_update_ptr) << " and " << to_string(*update_ptr); + CHECK(last_update_ptr->notification_group_id_ == update_ptr->notification_group_id_); + CHECK(last_update_ptr->chat_id_ == update_ptr->chat_id_); + last_update_ptr->total_count_ = update_ptr->total_count_; + append(last_update_ptr->new_notifications_, std::move(update_ptr->new_notifications_)); + append(last_update_ptr->removed_notification_ids_, std::move(update_ptr->removed_notification_ids_)); + updates[i] = nullptr; + is_changed = true; + continue; } - CHECK(found); // there should be no update about editing of deleted message - } else { - auto previous_update_ptr = static_cast(previous_update.get()); - CHECK(previous_update_ptr->notification_->id_ == notification_id); - previous_update_ptr->notification_->type_ = std::move(type); - } - - update = nullptr; - } - } - cur_pos++; - } - - updates.erase(std::remove_if(updates.begin(), updates.end(), [](auto &update) { return update == nullptr; }), - updates.end()); - if (updates.empty()) { - return; - } - - size_t last_update_pos = 0; - for (size_t i = 1; i < updates.size(); i++) { - if (updates[last_update_pos]->get_id() == td_api::updateNotificationGroup::ID && - updates[i]->get_id() == td_api::updateNotificationGroup::ID) { - auto last_update_ptr = static_cast(updates[last_update_pos].get()); - auto update_ptr = static_cast(updates[i].get()); - if (last_update_ptr->notification_settings_chat_id_ == update_ptr->notification_settings_chat_id_ && - last_update_ptr->is_silent_ == update_ptr->is_silent_) { - if ((last_update_ptr->new_notifications_.empty() && update_ptr->new_notifications_.empty()) || - (last_update_ptr->removed_notification_ids_.empty() && update_ptr->removed_notification_ids_.empty())) { - // combine updates - VLOG(notifications) << "Combine " << to_string(*last_update_ptr) << " and " << to_string(*update_ptr); - CHECK(last_update_ptr->notification_group_id_ == update_ptr->notification_group_id_); - CHECK(last_update_ptr->chat_id_ == update_ptr->chat_id_); - last_update_ptr->total_count_ = update_ptr->total_count_; - append(last_update_ptr->new_notifications_, std::move(update_ptr->new_notifications_)); - append(last_update_ptr->removed_notification_ids_, std::move(update_ptr->removed_notification_ids_)); - updates[i] = nullptr; - continue; } } + last_update_pos++; + if (last_update_pos != i) { + updates[last_update_pos] = std::move(updates[i]); + } } - last_update_pos++; - if (last_update_pos != i) { - updates[last_update_pos] = std::move(updates[i]); - } + updates.resize(last_update_pos + 1); } - updates.resize(last_update_pos + 1); for (auto &update : updates) { VLOG(notifications) << "Send " << to_string(update); @@ -600,6 +618,9 @@ void NotificationManager::edit_notification(NotificationGroupId group_id, Notifi void NotificationManager::on_notifications_removed( NotificationGroups::iterator &&group_it, vector> &&added_notifications, vector &&removed_notification_ids) { + VLOG(notifications) << "In on_notifications_removed for " << group_it->first.group_id << " with " + << added_notifications.size() << " added notifications and " << removed_notification_ids.size() + << " removed notifications"; auto group_key = group_it->first; auto final_group_key = group_key; final_group_key.last_notification_date = 0; @@ -687,6 +708,7 @@ void NotificationManager::remove_notification(NotificationGroupId group_id, Noti // notification is still pending, just delete it group_it->second.pending_notifications.erase(it); if (group_it->second.pending_notifications.empty()) { + group_it->second.pending_notifications_flush_time = 0; flush_pending_notifications_timeout_.cancel_timeout(group_id.get()); } return promise.set_value(Unit()); @@ -747,7 +769,8 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id return promise.set_value(Unit()); } - VLOG(notifications) << "Remove " << group_id << " up to " << max_notification_id << " or " << max_message_id; + VLOG(notifications) << "Remove " << group_id << " up to " << max_notification_id << " or " << max_message_id + << " with new_total_count = " << new_total_count; if (max_notification_id.is_valid()) { // TODO remove notifications from database by max_notification_id, save that they are removed @@ -755,6 +778,7 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id auto group_it = get_group(group_id); if (group_it == groups_.end()) { + VLOG(notifications) << "Can't find " << group_id; // TODO synchronously load the group return promise.set_value(Unit()); } @@ -769,6 +793,7 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id } group_it->second.pending_notifications.erase(group_it->second.pending_notifications.begin(), pending_delete_end); if (group_it->second.pending_notifications.empty()) { + group_it->second.pending_notifications_flush_time = 0; flush_pending_notifications_timeout_.cancel_timeout(group_id.get()); } if (new_total_count != -1) { @@ -812,6 +837,9 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id if (new_total_count != -1 || !removed_notification_ids.empty()) { on_notifications_removed(std::move(group_it), vector>(), std::move(removed_notification_ids)); + } else { + VLOG(notifications) << "Have new_total_count = " << new_total_count << " and " << removed_notification_ids.size() + << " removed notifications"; } promise.set_value(Unit()); } @@ -884,4 +912,38 @@ void NotificationManager::on_notification_default_delay_changed() { VLOG(notifications) << "Set notification_default_delay_ms to " << notification_default_delay_ms_; } +void NotificationManager::before_get_difference() { + running_get_difference_ = true; +} + +void NotificationManager::after_get_difference() { + CHECK(running_get_difference_); + running_get_difference_ = false; + vector ready_group_ids; + for (auto &it : pending_updates_) { + if (running_get_chat_difference_.count(it.first) == 0) { + ready_group_ids.push_back(it.first); + } + } + for (auto group_id : ready_group_ids) { + flush_pending_updates_timeout_.cancel_timeout(group_id); + flush_pending_updates(group_id); + } +} + +void NotificationManager::before_get_chat_difference(NotificationGroupId group_id) { + VLOG(notifications) << "Before get chat diference in " << group_id; + bool is_inserted = running_get_chat_difference_.insert(group_id.get()).second; + CHECK(is_inserted); +} + +void NotificationManager::after_get_chat_difference(NotificationGroupId group_id) { + VLOG(notifications) << "After get chat diference in " << group_id; + auto erased_count = running_get_chat_difference_.erase(group_id.get()); + if (erased_count == 1 && !running_get_difference_ && pending_updates_.count(group_id.get()) == 1) { + flush_pending_updates_timeout_.cancel_timeout(group_id.get()); + flush_pending_updates(group_id.get()); + } +} + } // namespace td diff --git a/td/telegram/NotificationManager.h b/td/telegram/NotificationManager.h index e9b52bc0..abdc46a1 100644 --- a/td/telegram/NotificationManager.h +++ b/td/telegram/NotificationManager.h @@ -67,6 +67,14 @@ class NotificationManager : public Actor { void on_notification_default_delay_changed(); + void before_get_difference(); + + void after_get_difference(); + + void before_get_chat_difference(NotificationGroupId group_id); + + void after_get_chat_difference(NotificationGroupId group_id); + private: static constexpr int32 DEFAULT_GROUP_COUNT_MAX = 10; static constexpr int32 DEFAULT_GROUP_SIZE_MAX = 10; @@ -183,6 +191,7 @@ class NotificationManager : public Actor { int32 notification_default_delay_ms_ = DEFAULT_DEFAULT_DELAY_MS; bool running_get_difference_ = false; + std::unordered_set running_get_chat_difference_; NotificationGroups groups_; diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index ba553e3f..03f2c04e 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -25,6 +25,7 @@ #include "td/telegram/MessagesManager.h" #include "td/telegram/net/DcOptions.h" #include "td/telegram/net/NetQuery.h" +#include "td/telegram/NotificationManager.h" #include "td/telegram/Payments.h" #include "td/telegram/PrivacyManager.h" #include "td/telegram/SecretChatId.h" @@ -262,6 +263,7 @@ void UpdatesManager::before_get_difference() { td_->messages_manager_->before_get_difference(); send_closure(td_->secret_chats_manager_, &SecretChatsManager::before_get_difference, get_qts()); + send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference); } Promise<> UpdatesManager::add_pts(int32 pts) { @@ -1088,6 +1090,7 @@ void UpdatesManager::after_get_difference() { td_->inline_queries_manager_->after_get_difference(); td_->messages_manager_->after_get_difference(); + send_closure_later(td_->notification_manager_actor_, &NotificationManager::after_get_difference); send_closure(G()->state_manager(), &StateManager::on_synchronized, true); set_state(State::Type::General);