diff --git a/td/telegram/NotificationManager.cpp b/td/telegram/NotificationManager.cpp index 7a0592cb..87330176 100644 --- a/td/telegram/NotificationManager.cpp +++ b/td/telegram/NotificationManager.cpp @@ -156,7 +156,7 @@ NotificationGroupId NotificationManager::get_next_notification_group_id() { } NotificationManager::NotificationGroupKey NotificationManager::get_last_updated_group_key() const { - int left = max_notification_group_count_; + int32 left = max_notification_group_count_; auto it = groups_.begin(); while (it != groups_.end() && left > 1) { ++it; @@ -624,6 +624,27 @@ void NotificationManager::flush_pending_updates(int32 group_id, const char *sour } } +void NotificationManager::flush_all_pending_updates(bool include_delayed_chats, const char *source) { + vector ready_group_keys; + for (auto &it : pending_updates_) { + if (include_delayed_chats || running_get_chat_difference_.count(it.first) == 0) { + auto group_it = get_group(NotificationGroupId(it.first)); + CHECK(group_it != groups_.end()); + ready_group_keys.push_back(group_it->first); + } + } + + // flush groups in reverse order to not exceed max_notification_group_count_ + std::sort(ready_group_keys.begin(), ready_group_keys.end()); + for (auto group_key : reversed(ready_group_keys)) { + flush_pending_updates_timeout_.cancel_timeout(group_key.group_id.get()); + flush_pending_updates(group_key.group_id.get(), "after_get_difference"); + } + if (include_delayed_chats) { + CHECK(pending_updates_.empty()); + } +} + void NotificationManager::do_flush_pending_notifications(NotificationGroupKey &group_key, NotificationGroup &group, vector &pending_notifications) { if (pending_notifications.empty()) { @@ -793,6 +814,21 @@ void NotificationManager::flush_pending_notifications(NotificationGroupId group_ groups_.emplace(std::move(final_group_key), std::move(group)); } +void NotificationManager::flush_all_pending_notifications() { + std::multimap group_ids; + for (auto &group_it : groups_) { + if (!group_it.second.pending_notifications.empty()) { + group_ids.emplace(group_it.second.pending_notifications.back().date, group_it.first.group_id); + } + } + + // flush groups in order of last notification date + for (auto &it : group_ids) { + flush_pending_notifications_timeout_.cancel_timeout(it.second.get()); + flush_pending_notifications(it.second); + } +} + void NotificationManager::edit_notification(NotificationGroupId group_id, NotificationId notification_id, unique_ptr type) { if (is_disabled()) { @@ -804,6 +840,9 @@ void NotificationManager::edit_notification(NotificationGroupId group_id, Notifi VLOG(notifications) << "Edit " << notification_id << ": " << *type; auto group_it = get_group(group_id); + if (group_it == groups_.end()) { + return; + } auto &group = group_it->second; for (size_t i = 0; i < group.notifications.size(); i++) { auto ¬ification = group.notifications[i]; @@ -885,8 +924,7 @@ void NotificationManager::on_notifications_removed( /* if (last_loaded_group_key_ < last_group_key) { - TODO - load_notification_groups_from_database(); + TODO load_notification_groups_from_database(); } */ } @@ -939,6 +977,9 @@ void NotificationManager::remove_added_notifications_from_pending_updates( void NotificationManager::remove_notification(NotificationGroupId group_id, NotificationId notification_id, bool is_permanent, Promise &&promise) { + if (!group_id.is_valid()) { + return promise.set_error(Status::Error(400, "Notification group identifier is invalid")); + } if (!notification_id.is_valid()) { return promise.set_error(Status::Error(400, "Notification identifier is invalid")); } @@ -1127,18 +1168,48 @@ void NotificationManager::on_notification_group_count_max_changed() { CHECK(MIN_NOTIFICATION_GROUP_COUNT_MAX <= new_max_notification_group_count && new_max_notification_group_count <= MAX_NOTIFICATION_GROUP_COUNT_MAX); - if (static_cast(new_max_notification_group_count) == max_notification_group_count_) { + auto new_max_notification_group_count_size_t = static_cast(new_max_notification_group_count); + if (new_max_notification_group_count_size_t == max_notification_group_count_) { return; } VLOG(notifications) << "Change max notification group count from " << max_notification_group_count_ << " to " << new_max_notification_group_count; + bool is_increased = new_max_notification_group_count_size_t > max_notification_group_count_; if (max_notification_group_count_ != 0) { - // TODO + flush_all_pending_notifications(); + flush_all_pending_updates(true, "on_notification_group_size_max_changed begin"); + + size_t cur_pos = 0; + size_t min_group_count = min(new_max_notification_group_count_size_t, max_notification_group_count_); + size_t max_group_count = max(new_max_notification_group_count_size_t, max_notification_group_count_); + for (auto it = groups_.begin(); it != groups_.end() && cur_pos < max_group_count; ++it, cur_pos++) { + if (cur_pos < min_group_count) { + continue; + } + + auto &group_key = it->first; + auto &group = it->second; + CHECK(group.pending_notifications.empty()); + CHECK(pending_updates_.count(group_key.group_id.get()) == 0); + + if (is_increased) { + send_add_group_update(group_key, group); + } else { + send_remove_group_update(group_key, group, vector()); + } + } + + flush_all_pending_updates(true, "on_notification_group_size_max_changed end"); } - max_notification_group_count_ = static_cast(new_max_notification_group_count); + max_notification_group_count_ = new_max_notification_group_count_size_t; + /* + if (is_increased && last_loaded_group_key_ < get_last_updated_group_key()) { + TODO load_notification_groups_from_database(); + } + */ } void NotificationManager::on_notification_group_size_max_changed() { @@ -1151,7 +1222,8 @@ void NotificationManager::on_notification_group_size_max_changed() { CHECK(MIN_NOTIFICATION_GROUP_SIZE_MAX <= new_max_notification_group_size && new_max_notification_group_size <= MAX_NOTIFICATION_GROUP_SIZE_MAX); - if (static_cast(new_max_notification_group_size) == max_notification_group_size_) { + auto new_max_notification_group_size_size_t = static_cast(new_max_notification_group_size); + if (new_max_notification_group_size_size_t == max_notification_group_size_) { return; } @@ -1159,12 +1231,60 @@ void NotificationManager::on_notification_group_size_max_changed() { << new_max_notification_group_size; if (max_notification_group_size_ != 0) { - // TODO + flush_all_pending_notifications(); + flush_all_pending_updates(true, "on_notification_group_size_max_changed"); + + int32 left = max_notification_group_count_; + for (auto it = groups_.begin(); it != groups_.end() && left > 0; ++it, left--) { + auto &group_key = it->first; + auto &group = it->second; + CHECK(group.pending_notifications.empty()); + CHECK(pending_updates_.count(group_key.group_id.get()) == 0); + + vector> added_notifications; + vector removed_notification_ids; + auto notification_count = group.notifications.size(); + if (new_max_notification_group_size_size_t < max_notification_group_size_) { + if (notification_count <= new_max_notification_group_size_size_t) { + VLOG(notifications) << "There is no need to update " << group_key.group_id; + continue; + } + for (size_t i = notification_count - min(notification_count, max_notification_group_size_); + i < notification_count - new_max_notification_group_size_size_t; i++) { + removed_notification_ids.push_back(group.notifications[i].notification_id.get()); + } + CHECK(!removed_notification_ids.empty()); + } else { + if (notification_count <= max_notification_group_size_) { + VLOG(notifications) << "There is no need to update " << group_key.group_id; + continue; + } + for (size_t i = notification_count - min(notification_count, new_max_notification_group_size_size_t); + i < notification_count - max_notification_group_size_; i++) { + added_notifications.push_back(get_notification_object(group_key.dialog_id, group.notifications[i])); + if (added_notifications.back()->type_ == nullptr) { + added_notifications.pop_back(); + } + } + if (new_max_notification_group_size_size_t > notification_count && + static_cast(group.total_count) > notification_count) { + // TODO load more notifications in the group from the message database + } + if (added_notifications.empty()) { + continue; + } + } + auto update = td_api::make_object( + group_key.group_id.get(), group_key.dialog_id.get(), group_key.dialog_id.get(), true, group.total_count, + std::move(added_notifications), std::move(removed_notification_ids)); + VLOG(notifications) << "Send " << as_notification_update(update.get()); + send_closure(G()->td(), &Td::send_update, std::move(update)); + } } - max_notification_group_size_ = static_cast(new_max_notification_group_size); + max_notification_group_size_ = new_max_notification_group_size_size_t; keep_notification_group_size_ = - max_notification_group_size_ + max(EXTRA_GROUP_SIZE / 2, min(max_notification_group_size_, EXTRA_GROUP_SIZE)); + max_notification_group_size_ + clamp(max_notification_group_size_, EXTRA_GROUP_SIZE / 2, EXTRA_GROUP_SIZE); } void NotificationManager::on_online_cloud_timeout_changed() { @@ -1201,21 +1321,7 @@ void NotificationManager::after_get_difference_impl() { } VLOG(notifications) << "After get difference"; - vector ready_group_keys; - for (auto &it : pending_updates_) { - if (running_get_chat_difference_.count(it.first) == 0) { - auto group_it = get_group(NotificationGroupId(it.first)); - CHECK(group_it != groups_.end()); - ready_group_keys.push_back(group_it->first); - } - } - - // flush groups in reverse order to not exceed max_notification_group_count_ - std::sort(ready_group_keys.begin(), ready_group_keys.end()); - for (auto group_key : reversed(ready_group_keys)) { - flush_pending_updates_timeout_.cancel_timeout(group_key.group_id.get()); - flush_pending_updates(group_key.group_id.get(), "after_get_difference"); - } + flush_all_pending_updates(false, "after_get_difference"); } void NotificationManager::before_get_chat_difference(NotificationGroupId group_id) { diff --git a/td/telegram/NotificationManager.h b/td/telegram/NotificationManager.h index 9296d6fa..4eb7fe06 100644 --- a/td/telegram/NotificationManager.h +++ b/td/telegram/NotificationManager.h @@ -164,6 +164,8 @@ class NotificationManager : public Actor { void flush_pending_notifications(NotificationGroupId group_id); + void flush_all_pending_notifications(); + void on_notifications_removed(NotificationGroups::iterator &&group_it, vector> &&added_notifications, vector &&removed_notification_ids); @@ -174,6 +176,8 @@ class NotificationManager : public Actor { void flush_pending_updates(int32 group_id, const char *source); + void flush_all_pending_updates(bool include_delayed_chats, const char *source); + NotificationId current_notification_id_; NotificationGroupId current_notification_group_id_; diff --git a/tdactor/td/actor/Timeout.cpp b/tdactor/td/actor/Timeout.cpp index fa2e5fff..491b684d 100644 --- a/tdactor/td/actor/Timeout.cpp +++ b/tdactor/td/actor/Timeout.cpp @@ -77,20 +77,34 @@ void MultiTimeout::update_timeout() { } } -void MultiTimeout::timeout_expired() { - double now = Time::now_cached(); +vector MultiTimeout::get_expired_keys(double now) { + vector expired_keys; while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) { int64 key = static_cast(timeout_queue_.pop())->key; items_.erase(Item(key)); - expired_.push_back(key); + expired_keys.push_back(key); } + return expired_keys; +} + +void MultiTimeout::timeout_expired() { + vector expired_keys = get_expired_keys(Time::now_cached()); if (!items_.empty()) { update_timeout(); } - for (auto key : expired_) { + for (auto key : expired_keys) { + callback_(data_, key); + } +} + +void MultiTimeout::run_all() { + vector expired_keys = get_expired_keys(Time::now_cached() + 1e10); + if (!expired_keys.empty()) { + update_timeout(); + } + for (auto key : expired_keys) { callback_(data_, key); } - expired_.clear(); } } // namespace td diff --git a/tdactor/td/actor/Timeout.h b/tdactor/td/actor/Timeout.h index d08c4702..73b06d82 100644 --- a/tdactor/td/actor/Timeout.h +++ b/tdactor/td/actor/Timeout.h @@ -111,6 +111,8 @@ class MultiTimeout final : public Actor { void cancel_timeout(int64 key); + void run_all(); + private: friend class Scheduler; @@ -119,11 +121,12 @@ class MultiTimeout final : public Actor { KHeap timeout_queue_; std::set items_; - std::vector expired_; void update_timeout(); void timeout_expired() override; + + vector get_expired_keys(double now); }; } // namespace td