diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 17f1f257..ea530a62 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -17622,7 +17622,7 @@ MessagesManager::MessageNotificationGroup MessagesManager::get_message_notificat LOG(ERROR) << "Total notification count is negative in " << d->dialog_id; result.total_count = 0; } - result.notifications = get_message_notifications_from_database( + result.notifications = get_message_notifications_from_database_force( d, NotificationId::max(), static_cast(td_->notification_manager_->get_max_notification_group_size())); int32 last_notification_date = 0; @@ -17638,6 +17638,8 @@ MessagesManager::MessageNotificationGroup MessagesManager::get_message_notificat set_dialog_last_notification(d, last_notification_date, last_notification_id, "get_message_notification_group_force"); } + + std::reverse(result.notifications.begin(), result.notifications.end()); } return result; @@ -17649,30 +17651,43 @@ bool MessagesManager::is_message_has_active_notification(const Dialog *d, const (m->message_id.get() > d->last_read_inbox_message_id.get() || m->contains_unread_mention); } -vector MessagesManager::get_message_notifications_from_database(Dialog *d, - NotificationId from_notification_id, - int32 limit) { +vector MessagesManager::get_message_notifications_from_database_force(Dialog *d, + NotificationId from_notification_id, + int32 limit) { CHECK(d != nullptr); if (!G()->parameters().use_message_db) { return {}; } - auto result = G()->td_db()->get_messages_db_sync()->get_messages_from_notification_id(d->dialog_id, - from_notification_id, limit); - if (result.is_error()) { - return {}; - } - auto messages = result.move_as_ok(); + while (true) { + auto result = G()->td_db()->get_messages_db_sync()->get_messages_from_notification_id(d->dialog_id, + from_notification_id, limit); + if (result.is_error()) { + return {}; + } + auto messages = result.move_as_ok(); + if (messages.empty()) { + return {}; + } - vector res; - res.reserve(messages.size()); - for (auto &message : messages) { - auto m = on_get_message_from_database(d->dialog_id, d, std::move(message)); - if (is_message_has_active_notification(d, m)) { - res.emplace_back(m->notification_id, m->date, create_new_message_notification(m->message_id)); + vector res; + res.reserve(messages.size()); + bool is_found = false; + for (auto &message : messages) { + auto m = on_get_message_from_database(d->dialog_id, d, std::move(message)); + if (is_message_has_active_notification(d, m)) { + res.emplace_back(m->notification_id, m->date, create_new_message_notification(m->message_id)); + } + if (m != nullptr && m->notification_id.is_valid()) { + CHECK(m->notification_id.get() < from_notification_id.get()); + from_notification_id = m->notification_id; + is_found = true; + } + } + if (!res.empty() || !is_found) { + return res; } } - return res; } vector MessagesManager::get_message_notification_group_keys_from_database( @@ -17681,7 +17696,7 @@ vector MessagesManager::get_message_notification_group_key return {}; } - VLOG(notifications) << "Load " << limit << " message notification groups from database from date " + VLOG(notifications) << "Trying to load " << limit << " message notification groups from database from date " << from_last_notification_date << " and " << from_dialog_id; Result> r_dialogs = @@ -17703,11 +17718,82 @@ vector MessagesManager::get_message_notification_group_key CHECK(d->message_notification_group_id.is_valid()); group_keys.emplace_back(d->message_notification_group_id, d->dialog_id, d->last_notification_date); - VLOG(notifications) << "Load " << group_keys.back() << " from database"; + VLOG(notifications) << "Loaded " << group_keys.back() << " from database"; } return group_keys; } +void MessagesManager::get_message_notifications_from_database(DialogId dialog_id, NotificationId from_notification_id, + int32 limit, Promise> promise) { + if (!G()->parameters().use_message_db) { + return promise.set_error(Status::Error(500, "There is no message database")); + } + + CHECK(dialog_id.is_valid()); + CHECK(limit > 0); + + auto d = get_dialog_force(dialog_id); + if (d == nullptr) { + LOG(ERROR) << "Can't find " << dialog_id; + return promise.set_error(Status::Error(500, "Can't find chat")); + } + if (!d->message_notification_group_id.is_valid()) { + return promise.set_value(vector()); + } + + VLOG(notifications) << "Trying to load " << limit << " notifications in " << dialog_id << " from " + << from_notification_id; + G()->td_db()->get_messages_db_async()->get_messages_from_notification_id( + dialog_id, from_notification_id, limit, + PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, limit, + promise = std::move(promise)](Result> result) mutable { + send_closure(actor_id, &MessagesManager::on_get_message_notifications_from_database, dialog_id, limit, + std::move(result), std::move(promise)); + })); +} + +void MessagesManager::on_get_message_notifications_from_database(DialogId dialog_id, int32 limit, + Result> result, + Promise> promise) { + if (result.is_error()) { + return promise.set_error(result.move_as_error()); + } + + Dialog *d = get_dialog_force(dialog_id); + CHECK(d != nullptr); + CHECK(d->message_notification_group_id.is_valid()); + + auto messages = result.move_as_ok(); + vector res; + res.reserve(messages.size()); + NotificationId from_notification_id; + for (auto &message : messages) { + auto m = on_get_message_from_database(dialog_id, d, std::move(message)); + if (is_message_has_active_notification(d, m)) { + res.emplace_back(m->notification_id, m->date, create_new_message_notification(m->message_id)); + } + if (m != nullptr && m->notification_id.is_valid()) { + CHECK(!from_notification_id.is_valid() || m->notification_id.get() < from_notification_id.get()); + from_notification_id = m->notification_id; + } + } + if (!res.empty() || !from_notification_id.is_valid() || static_cast(limit) > messages.size()) { + std::reverse(res.begin(), res.end()); + return promise.set_value(std::move(res)); + } + + // try again from adjusted from_notification_id + VLOG(notifications) << "Trying to load " << limit << " notifications in " << dialog_id << " from " + << from_notification_id; + G()->td_db()->get_messages_db_async()->get_messages_from_notification_id( + dialog_id, from_notification_id, limit, + PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, limit, + promise = std::move(promise)](Result> result) mutable { + send_closure(actor_id, &MessagesManager::on_get_message_notifications_from_database, dialog_id, limit, + std::move(result), std::move(promise)); + })); +} + void MessagesManager::remove_message_notification(DialogId dialog_id, NotificationId notification_id) { Dialog *d = get_dialog_force(dialog_id); if (d == nullptr) { @@ -17736,11 +17822,10 @@ void MessagesManager::remove_message_notification(DialogId dialog_id, Notificati if (G()->parameters().use_message_db) { G()->td_db()->get_messages_db_async()->get_messages_from_notification_id( dialog_id, NotificationId(notification_id.get() + 1), 1, - PromiseCreator::lambda( - [dialog_id, notification_id, actor_id = actor_id(this)](vector result) mutable { - send_closure(actor_id, &MessagesManager::do_remove_message_notification, dialog_id, notification_id, - std::move(result)); - })); + PromiseCreator::lambda([dialog_id, notification_id, actor_id = actor_id(this)](vector result) { + send_closure(actor_id, &MessagesManager::do_remove_message_notification, dialog_id, notification_id, + std::move(result)); + })); } } @@ -17867,7 +17952,7 @@ bool MessagesManager::add_new_message_notification(Dialog *d, Message *m, bool f << ", but forced to send notification about " << m->message_id << " in " << d->dialog_id; // notification group must be preloaded to guarantee that there is no race between - // get_message_notifications_from_database and new notifications added right now + // get_message_notifications_from_database_force and new notifications added right now td_->notification_manager_->load_group_force(get_dialog_message_notification_group_id(d)); do { m->notification_id = td_->notification_manager_->get_next_notification_id(); @@ -20190,7 +20275,7 @@ MessagesManager::Message *MessagesManager::get_message_force(Dialog *d, MessageI return nullptr; } - LOG(INFO) << "Try to load " << FullMessageId{d->dialog_id, message_id} << " from database"; + LOG(INFO) << "Trying to load " << FullMessageId{d->dialog_id, message_id} << " from database"; auto r_value = G()->td_db()->get_messages_db_sync()->get_message({d->dialog_id, message_id}); if (r_value.is_error()) { return nullptr; diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index 28cd5701..8aaa7257 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -668,6 +668,9 @@ class MessagesManager : public Actor { vector get_message_notification_group_keys_from_database(int32 from_last_notification_date, DialogId from_dialog_id, int32 limit); + void get_message_notifications_from_database(DialogId dialog_id, NotificationId from_notification_id, int32 limit, + Promise> promise); + void remove_message_notification(DialogId dialog_id, NotificationId notification_id); void remove_message_notifications(DialogId dialog_id, NotificationId max_notification_id); @@ -1491,8 +1494,11 @@ class MessagesManager : public Actor { NotificationGroupId get_dialog_message_notification_group_id(Dialog *d); - vector get_message_notifications_from_database(Dialog *d, NotificationId from_notification_id, - int32 limit); + vector get_message_notifications_from_database_force(Dialog *d, NotificationId from_notification_id, + int32 limit); + + void on_get_message_notifications_from_database(DialogId dialog_id, int32 limit, Result> result, + Promise> promise); void do_remove_message_notification(DialogId dialog_id, NotificationId notification_id, vector result); diff --git a/td/telegram/NotificationManager.cpp b/td/telegram/NotificationManager.cpp index 6a699e5c..0fe9c394 100644 --- a/td/telegram/NotificationManager.cpp +++ b/td/telegram/NotificationManager.cpp @@ -193,8 +193,6 @@ NotificationManager::NotificationGroups::iterator NotificationManager::get_group } } - std::reverse(message_group.notifications.begin(), message_group.notifications.end()); - NotificationGroup group; group.total_count = message_group.total_count; group.notifications = std::move(message_group.notifications); @@ -244,9 +242,178 @@ int32 NotificationManager::load_message_notification_groups_from_database(int32 return result; } +NotificationId NotificationManager::get_first_notification_id(const NotificationGroup &group) { + if (!group.notifications.empty()) { + return group.notifications[0].notification_id; + } + if (!group.pending_notifications.empty()) { + return group.pending_notifications[0].notification_id; + } + return NotificationId(); +} + void NotificationManager::load_message_notifications_from_database(const NotificationGroupKey &group_key, - const NotificationGroup &group) { - // TODO + NotificationGroup &group, size_t desired_size) { + if (!G()->parameters().use_message_db) { + return; + } + if (group.is_loaded_from_database || group.is_being_loaded_from_database) { + return; + } + + VLOG(notifications) << "Trying to load up to " << desired_size << " notifications in " << group_key.group_id + << " with " << group.notifications.size() << " current notifications"; + + group.is_being_loaded_from_database = true; + + CHECK(desired_size > group.notifications.size()); + size_t limit = desired_size - group.notifications.size(); + auto first_notification_id = get_first_notification_id(group); + auto from_notification_id = first_notification_id.is_valid() ? first_notification_id : NotificationId::max(); + send_closure(G()->messages_manager(), &MessagesManager::get_message_notifications_from_database, group_key.dialog_id, + from_notification_id, static_cast(limit), + PromiseCreator::lambda([actor_id = actor_id(this), group_id = group_key.group_id, + limit](Result> r_notifications) { + send_closure_later(actor_id, &NotificationManager::on_get_message_notifications_from_database, + group_id, limit, std::move(r_notifications)); + })); +} + +void NotificationManager::on_get_message_notifications_from_database(NotificationGroupId group_id, size_t limit, + Result> r_notifications) { + auto group_it = get_group(group_id); + CHECK(group_it != groups_.end()); + auto &group = group_it->second; + CHECK(group.is_being_loaded_from_database == true); + group.is_being_loaded_from_database = false; + + if (r_notifications.is_error()) { + group.is_loaded_from_database = true; // do not try again to load it + return; + } + auto notifications = r_notifications.move_as_ok(); + + CHECK(limit > 0); + if (notifications.empty()) { + group.is_loaded_from_database = true; + } + + auto first_notification_id = get_first_notification_id(group); + if (first_notification_id.is_valid()) { + while (!notifications.empty() && notifications.back().notification_id.get() >= first_notification_id.get()) { + // possible if notifications was added after the database request was sent + notifications.pop_back(); + } + } + + add_notifications_to_group_begin(std::move(group_it), std::move(notifications)); + + group_it = get_group(group_id); + CHECK(group_it != groups_.end()); + if (max_notification_group_size_ > group_it->second.notifications.size()) { + load_message_notifications_from_database(group_it->first, group_it->second, keep_notification_group_size_); + } +} + +void NotificationManager::add_notifications_to_group_begin(NotificationGroups::iterator group_it, + vector notifications) { + CHECK(group_it != groups_.end()); + + if (notifications.empty()) { + return; + } + VLOG(notifications) << "Add to beginning of " << group_it->first << " of size " + << group_it->second.notifications.size() << ' ' << notifications; + + auto group_key = group_it->first; + auto final_group_key = group_key; + for (auto ¬ification : notifications) { + if (notification.date > final_group_key.last_notification_date) { + final_group_key.last_notification_date = notification.date; + } + } + CHECK(final_group_key.last_notification_date != 0); + + bool is_position_changed = final_group_key.last_notification_date != group_key.last_notification_date; + + NotificationGroup group = std::move(group_it->second); + if (is_position_changed) { + VLOG(notifications) << "Position of notification group is changed from " << group_key << " to " << final_group_key; + delete_group(std::move(group_it)); + } + + auto last_group_key = get_last_updated_group_key(); + bool was_updated = false; + bool is_updated = false; + if (is_position_changed) { + was_updated = group_key.last_notification_date != 0 && group_key < last_group_key; + is_updated = final_group_key.last_notification_date != 0 && final_group_key < last_group_key; + } else { + was_updated = is_updated = !(last_group_key < group_key); + } + + if (!is_updated) { + CHECK(!was_updated); + VLOG(notifications) << "There is no need to send updateNotificationGroup in " << group_key + << ", because of newer notification groups"; + group.notifications.insert(group.notifications.begin(), std::make_move_iterator(notifications.begin()), + std::make_move_iterator(notifications.end())); + } else { + if (!was_updated) { + if (last_group_key.last_notification_date != 0) { + // need to remove last notification group to not exceed max_notification_group_count_ + send_remove_group_update(last_group_key, groups_[last_group_key], vector()); + } + send_add_group_update(group_key, group); + } + + vector new_notifications; + vector> added_notifications; + new_notifications.reserve(notifications.size()); + added_notifications.reserve(notifications.size()); + for (auto ¬ification : notifications) { + added_notifications.push_back(get_notification_object(group_key.dialog_id, notification)); + if (added_notifications.back()->type_ == nullptr) { + added_notifications.pop_back(); + } else { + new_notifications.push_back(std::move(notification)); + } + } + notifications = std::move(new_notifications); + + size_t old_notification_count = group.notifications.size(); + auto updated_notification_count = old_notification_count < max_notification_group_size_ + ? max_notification_group_size_ - old_notification_count + : 0; + if (added_notifications.size() > updated_notification_count) { + added_notifications.erase(added_notifications.begin(), added_notifications.end() - updated_notification_count); + } + auto new_notification_count = old_notification_count < keep_notification_group_size_ + ? keep_notification_group_size_ - old_notification_count + : 0; + if (new_notification_count > notifications.size()) { + new_notification_count = notifications.size(); + } + if (new_notification_count != 0) { + VLOG(notifications) << "Add " << new_notification_count << " notifications to " << group_key.group_id + << " with current size " << group.notifications.size(); + group.notifications.insert(group.notifications.begin(), + std::make_move_iterator(notifications.end() - new_notification_count), + std::make_move_iterator(notifications.end())); + } + + if (!added_notifications.empty()) { + add_update_notification_group(td_api::make_object( + group_key.group_id.get(), group_key.dialog_id.get(), 0, true, group.total_count, + std::move(added_notifications), vector())); + } + } + + if (is_position_changed) { + add_group(std::move(final_group_key), std::move(group)); + } else { + group_it->second = std::move(group); + } } size_t NotificationManager::get_max_notification_group_size() const { @@ -795,9 +962,7 @@ void NotificationManager::do_flush_pending_notifications(NotificationGroupKey &g } group.total_count += narrow_cast(added_notifications.size()); if (added_notifications.size() > max_notification_group_size_) { - added_notifications.erase( - added_notifications.begin(), - added_notifications.begin() + (added_notifications.size() - max_notification_group_size_)); + added_notifications.erase(added_notifications.begin(), added_notifications.end() - max_notification_group_size_); } vector removed_notification_ids; @@ -893,6 +1058,7 @@ void NotificationManager::flush_pending_notifications(NotificationGroupId group_ CHECK(!was_updated); VLOG(notifications) << "There is no need to send updateNotificationGroup in " << group_key << ", because of newer notification groups"; + group.total_count += narrow_cast(group.pending_notifications.size()); for (auto &pending_notification : group.pending_notifications) { group.notifications.emplace_back(pending_notification.notification_id, pending_notification.date, std::move(pending_notification.type)); @@ -900,7 +1066,7 @@ void NotificationManager::flush_pending_notifications(NotificationGroupId group_ } else { if (!was_updated) { if (last_group_key.last_notification_date != 0) { - // need to remove last notification group to not exceed max_notification_group_size_ + // need to remove last notification group to not exceed max_notification_group_count_ send_remove_group_update(last_group_key, groups_[last_group_key], vector()); } send_add_group_update(group_key, group); @@ -925,13 +1091,10 @@ void NotificationManager::flush_pending_notifications(NotificationGroupId group_ group.pending_notifications_flush_time = 0; group.pending_notifications.clear(); - if (group.notifications.size() > - keep_notification_group_size_ + - EXTRA_GROUP_SIZE) { // ensure that we delete a lot of notifications simultaneously + // if we can delete a lot of notifications simultaneously + if (group.notifications.size() > keep_notification_group_size_ + EXTRA_GROUP_SIZE) { // keep only keep_notification_group_size_ last notifications in memory - group.notifications.erase( - group.notifications.begin(), - group.notifications.begin() + (group.notifications.size() - keep_notification_group_size_)); + group.notifications.erase(group.notifications.begin(), group.notifications.end() - keep_notification_group_size_); } add_group(std::move(final_group_key), std::move(group)); @@ -1144,6 +1307,13 @@ void NotificationManager::remove_notification(NotificationGroupId group_id, Noti } } + if (is_permanent) { + group_it->second.total_count--; + } + if (is_found) { + group_it->second.notifications.erase(group_it->second.notifications.begin() + notification_pos); + } + vector> added_notifications; vector removed_notification_ids; if (is_found && notification_pos + max_notification_group_size_ >= old_group_size) { @@ -1156,17 +1326,10 @@ void NotificationManager::remove_notification(NotificationGroupId group_id, Noti added_notifications.pop_back(); } } else { - load_message_notifications_from_database(group_it->first, group_it->second); + load_message_notifications_from_database(group_it->first, group_it->second, keep_notification_group_size_); } } - if (is_permanent) { - group_it->second.total_count--; - } - if (is_found) { - group_it->second.notifications.erase(group_it->second.notifications.begin() + notification_pos); - } - if (is_permanent || !removed_notification_ids.empty()) { on_notifications_removed(std::move(group_it), std::move(added_notifications), std::move(removed_notification_ids)); } @@ -1353,6 +1516,10 @@ void NotificationManager::on_notification_group_size_max_changed() { return; } + auto new_keep_notification_group_size = + new_max_notification_group_size_size_t + + clamp(new_max_notification_group_size_size_t, EXTRA_GROUP_SIZE / 2, EXTRA_GROUP_SIZE); + VLOG(notifications) << "Change max notification group size from " << max_notification_group_size_ << " to " << new_max_notification_group_size; @@ -1381,6 +1548,9 @@ void NotificationManager::on_notification_group_size_max_changed() { } CHECK(!removed_notification_ids.empty()); } else { + if (new_max_notification_group_size_size_t > notification_count) { + load_message_notifications_from_database(group_key, group, new_keep_notification_group_size); + } if (notification_count <= max_notification_group_size_) { VLOG(notifications) << "There is no need to update " << group_key.group_id; continue; @@ -1392,10 +1562,6 @@ void NotificationManager::on_notification_group_size_max_changed() { added_notifications.pop_back(); } } - if (new_max_notification_group_size_size_t > notification_count && - static_cast(group.total_count) > notification_count && G()->parameters().use_message_db) { - load_message_notifications_from_database(group_key, group); - } if (added_notifications.empty()) { continue; } @@ -1409,8 +1575,7 @@ void NotificationManager::on_notification_group_size_max_changed() { } max_notification_group_size_ = new_max_notification_group_size_size_t; - keep_notification_group_size_ = - max_notification_group_size_ + clamp(max_notification_group_size_, EXTRA_GROUP_SIZE / 2, EXTRA_GROUP_SIZE); + keep_notification_group_size_ = new_keep_notification_group_size; } void NotificationManager::on_online_cloud_timeout_changed() { diff --git a/td/telegram/NotificationManager.h b/td/telegram/NotificationManager.h index 2bddc954..75c501ff 100644 --- a/td/telegram/NotificationManager.h +++ b/td/telegram/NotificationManager.h @@ -106,6 +106,8 @@ class NotificationManager : public Actor { struct NotificationGroup { int32 total_count = 0; + bool is_loaded_from_database = false; + bool is_being_loaded_from_database = false; vector notifications; @@ -139,9 +141,17 @@ class NotificationManager : public Actor { void delete_group(NotificationGroups::iterator &&group_it); + static NotificationId get_first_notification_id(const NotificationGroup &group); + int32 load_message_notification_groups_from_database(int32 limit, bool send_update); - void load_message_notifications_from_database(const NotificationGroupKey &group_key, const NotificationGroup &group); + void load_message_notifications_from_database(const NotificationGroupKey &group_key, NotificationGroup &group, + size_t desired_size); + + void on_get_message_notifications_from_database(NotificationGroupId group_id, size_t limit, + Result> r_notifications); + + void add_notifications_to_group_begin(NotificationGroups::iterator group_it, vector notifications); NotificationGroupKey get_last_updated_group_key() const;