Loading old message notifications from database.

GitOrigin-RevId: dfd63d824dc17efe4dd0e682f0a4adfa5639f273
This commit is contained in:
levlam 2018-12-03 18:38:29 +03:00
parent 2b5be01417
commit 25225d5c4b
4 changed files with 323 additions and 57 deletions

View File

@ -17622,7 +17622,7 @@ MessagesManager::MessageNotificationGroup MessagesManager::get_message_notificat
LOG(ERROR) << "Total notification count is negative in " << d->dialog_id;
result.total_count = 0;
}
result.notifications = get_message_notifications_from_database(
result.notifications = get_message_notifications_from_database_force(
d, NotificationId::max(), static_cast<int32>(td_->notification_manager_->get_max_notification_group_size()));
int32 last_notification_date = 0;
@ -17638,6 +17638,8 @@ MessagesManager::MessageNotificationGroup MessagesManager::get_message_notificat
set_dialog_last_notification(d, last_notification_date, last_notification_id,
"get_message_notification_group_force");
}
std::reverse(result.notifications.begin(), result.notifications.end());
}
return result;
@ -17649,30 +17651,43 @@ bool MessagesManager::is_message_has_active_notification(const Dialog *d, const
(m->message_id.get() > d->last_read_inbox_message_id.get() || m->contains_unread_mention);
}
vector<Notification> MessagesManager::get_message_notifications_from_database(Dialog *d,
NotificationId from_notification_id,
int32 limit) {
vector<Notification> MessagesManager::get_message_notifications_from_database_force(Dialog *d,
NotificationId from_notification_id,
int32 limit) {
CHECK(d != nullptr);
if (!G()->parameters().use_message_db) {
return {};
}
auto result = G()->td_db()->get_messages_db_sync()->get_messages_from_notification_id(d->dialog_id,
from_notification_id, limit);
if (result.is_error()) {
return {};
}
auto messages = result.move_as_ok();
while (true) {
auto result = G()->td_db()->get_messages_db_sync()->get_messages_from_notification_id(d->dialog_id,
from_notification_id, limit);
if (result.is_error()) {
return {};
}
auto messages = result.move_as_ok();
if (messages.empty()) {
return {};
}
vector<Notification> res;
res.reserve(messages.size());
for (auto &message : messages) {
auto m = on_get_message_from_database(d->dialog_id, d, std::move(message));
if (is_message_has_active_notification(d, m)) {
res.emplace_back(m->notification_id, m->date, create_new_message_notification(m->message_id));
vector<Notification> res;
res.reserve(messages.size());
bool is_found = false;
for (auto &message : messages) {
auto m = on_get_message_from_database(d->dialog_id, d, std::move(message));
if (is_message_has_active_notification(d, m)) {
res.emplace_back(m->notification_id, m->date, create_new_message_notification(m->message_id));
}
if (m != nullptr && m->notification_id.is_valid()) {
CHECK(m->notification_id.get() < from_notification_id.get());
from_notification_id = m->notification_id;
is_found = true;
}
}
if (!res.empty() || !is_found) {
return res;
}
}
return res;
}
vector<NotificationGroupKey> MessagesManager::get_message_notification_group_keys_from_database(
@ -17681,7 +17696,7 @@ vector<NotificationGroupKey> MessagesManager::get_message_notification_group_key
return {};
}
VLOG(notifications) << "Load " << limit << " message notification groups from database from date "
VLOG(notifications) << "Trying to load " << limit << " message notification groups from database from date "
<< from_last_notification_date << " and " << from_dialog_id;
Result<std::vector<BufferSlice>> r_dialogs =
@ -17703,11 +17718,82 @@ vector<NotificationGroupKey> MessagesManager::get_message_notification_group_key
CHECK(d->message_notification_group_id.is_valid());
group_keys.emplace_back(d->message_notification_group_id, d->dialog_id, d->last_notification_date);
VLOG(notifications) << "Load " << group_keys.back() << " from database";
VLOG(notifications) << "Loaded " << group_keys.back() << " from database";
}
return group_keys;
}
void MessagesManager::get_message_notifications_from_database(DialogId dialog_id, NotificationId from_notification_id,
int32 limit, Promise<vector<Notification>> promise) {
if (!G()->parameters().use_message_db) {
return promise.set_error(Status::Error(500, "There is no message database"));
}
CHECK(dialog_id.is_valid());
CHECK(limit > 0);
auto d = get_dialog_force(dialog_id);
if (d == nullptr) {
LOG(ERROR) << "Can't find " << dialog_id;
return promise.set_error(Status::Error(500, "Can't find chat"));
}
if (!d->message_notification_group_id.is_valid()) {
return promise.set_value(vector<Notification>());
}
VLOG(notifications) << "Trying to load " << limit << " notifications in " << dialog_id << " from "
<< from_notification_id;
G()->td_db()->get_messages_db_async()->get_messages_from_notification_id(
dialog_id, from_notification_id, limit,
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, limit,
promise = std::move(promise)](Result<vector<BufferSlice>> result) mutable {
send_closure(actor_id, &MessagesManager::on_get_message_notifications_from_database, dialog_id, limit,
std::move(result), std::move(promise));
}));
}
void MessagesManager::on_get_message_notifications_from_database(DialogId dialog_id, int32 limit,
Result<vector<BufferSlice>> result,
Promise<vector<Notification>> promise) {
if (result.is_error()) {
return promise.set_error(result.move_as_error());
}
Dialog *d = get_dialog_force(dialog_id);
CHECK(d != nullptr);
CHECK(d->message_notification_group_id.is_valid());
auto messages = result.move_as_ok();
vector<Notification> res;
res.reserve(messages.size());
NotificationId from_notification_id;
for (auto &message : messages) {
auto m = on_get_message_from_database(dialog_id, d, std::move(message));
if (is_message_has_active_notification(d, m)) {
res.emplace_back(m->notification_id, m->date, create_new_message_notification(m->message_id));
}
if (m != nullptr && m->notification_id.is_valid()) {
CHECK(!from_notification_id.is_valid() || m->notification_id.get() < from_notification_id.get());
from_notification_id = m->notification_id;
}
}
if (!res.empty() || !from_notification_id.is_valid() || static_cast<size_t>(limit) > messages.size()) {
std::reverse(res.begin(), res.end());
return promise.set_value(std::move(res));
}
// try again from adjusted from_notification_id
VLOG(notifications) << "Trying to load " << limit << " notifications in " << dialog_id << " from "
<< from_notification_id;
G()->td_db()->get_messages_db_async()->get_messages_from_notification_id(
dialog_id, from_notification_id, limit,
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, limit,
promise = std::move(promise)](Result<vector<BufferSlice>> result) mutable {
send_closure(actor_id, &MessagesManager::on_get_message_notifications_from_database, dialog_id, limit,
std::move(result), std::move(promise));
}));
}
void MessagesManager::remove_message_notification(DialogId dialog_id, NotificationId notification_id) {
Dialog *d = get_dialog_force(dialog_id);
if (d == nullptr) {
@ -17736,11 +17822,10 @@ void MessagesManager::remove_message_notification(DialogId dialog_id, Notificati
if (G()->parameters().use_message_db) {
G()->td_db()->get_messages_db_async()->get_messages_from_notification_id(
dialog_id, NotificationId(notification_id.get() + 1), 1,
PromiseCreator::lambda(
[dialog_id, notification_id, actor_id = actor_id(this)](vector<BufferSlice> result) mutable {
send_closure(actor_id, &MessagesManager::do_remove_message_notification, dialog_id, notification_id,
std::move(result));
}));
PromiseCreator::lambda([dialog_id, notification_id, actor_id = actor_id(this)](vector<BufferSlice> result) {
send_closure(actor_id, &MessagesManager::do_remove_message_notification, dialog_id, notification_id,
std::move(result));
}));
}
}
@ -17867,7 +17952,7 @@ bool MessagesManager::add_new_message_notification(Dialog *d, Message *m, bool f
<< ", but forced to send notification about " << m->message_id << " in "
<< d->dialog_id;
// notification group must be preloaded to guarantee that there is no race between
// get_message_notifications_from_database and new notifications added right now
// get_message_notifications_from_database_force and new notifications added right now
td_->notification_manager_->load_group_force(get_dialog_message_notification_group_id(d));
do {
m->notification_id = td_->notification_manager_->get_next_notification_id();
@ -20190,7 +20275,7 @@ MessagesManager::Message *MessagesManager::get_message_force(Dialog *d, MessageI
return nullptr;
}
LOG(INFO) << "Try to load " << FullMessageId{d->dialog_id, message_id} << " from database";
LOG(INFO) << "Trying to load " << FullMessageId{d->dialog_id, message_id} << " from database";
auto r_value = G()->td_db()->get_messages_db_sync()->get_message({d->dialog_id, message_id});
if (r_value.is_error()) {
return nullptr;

View File

@ -668,6 +668,9 @@ class MessagesManager : public Actor {
vector<NotificationGroupKey> get_message_notification_group_keys_from_database(int32 from_last_notification_date,
DialogId from_dialog_id, int32 limit);
void get_message_notifications_from_database(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<Notification>> promise);
void remove_message_notification(DialogId dialog_id, NotificationId notification_id);
void remove_message_notifications(DialogId dialog_id, NotificationId max_notification_id);
@ -1491,8 +1494,11 @@ class MessagesManager : public Actor {
NotificationGroupId get_dialog_message_notification_group_id(Dialog *d);
vector<Notification> get_message_notifications_from_database(Dialog *d, NotificationId from_notification_id,
int32 limit);
vector<Notification> get_message_notifications_from_database_force(Dialog *d, NotificationId from_notification_id,
int32 limit);
void on_get_message_notifications_from_database(DialogId dialog_id, int32 limit, Result<vector<BufferSlice>> result,
Promise<vector<Notification>> promise);
void do_remove_message_notification(DialogId dialog_id, NotificationId notification_id, vector<BufferSlice> result);

View File

@ -193,8 +193,6 @@ NotificationManager::NotificationGroups::iterator NotificationManager::get_group
}
}
std::reverse(message_group.notifications.begin(), message_group.notifications.end());
NotificationGroup group;
group.total_count = message_group.total_count;
group.notifications = std::move(message_group.notifications);
@ -244,9 +242,178 @@ int32 NotificationManager::load_message_notification_groups_from_database(int32
return result;
}
NotificationId NotificationManager::get_first_notification_id(const NotificationGroup &group) {
if (!group.notifications.empty()) {
return group.notifications[0].notification_id;
}
if (!group.pending_notifications.empty()) {
return group.pending_notifications[0].notification_id;
}
return NotificationId();
}
void NotificationManager::load_message_notifications_from_database(const NotificationGroupKey &group_key,
const NotificationGroup &group) {
// TODO
NotificationGroup &group, size_t desired_size) {
if (!G()->parameters().use_message_db) {
return;
}
if (group.is_loaded_from_database || group.is_being_loaded_from_database) {
return;
}
VLOG(notifications) << "Trying to load up to " << desired_size << " notifications in " << group_key.group_id
<< " with " << group.notifications.size() << " current notifications";
group.is_being_loaded_from_database = true;
CHECK(desired_size > group.notifications.size());
size_t limit = desired_size - group.notifications.size();
auto first_notification_id = get_first_notification_id(group);
auto from_notification_id = first_notification_id.is_valid() ? first_notification_id : NotificationId::max();
send_closure(G()->messages_manager(), &MessagesManager::get_message_notifications_from_database, group_key.dialog_id,
from_notification_id, static_cast<int32>(limit),
PromiseCreator::lambda([actor_id = actor_id(this), group_id = group_key.group_id,
limit](Result<vector<Notification>> r_notifications) {
send_closure_later(actor_id, &NotificationManager::on_get_message_notifications_from_database,
group_id, limit, std::move(r_notifications));
}));
}
void NotificationManager::on_get_message_notifications_from_database(NotificationGroupId group_id, size_t limit,
Result<vector<Notification>> r_notifications) {
auto group_it = get_group(group_id);
CHECK(group_it != groups_.end());
auto &group = group_it->second;
CHECK(group.is_being_loaded_from_database == true);
group.is_being_loaded_from_database = false;
if (r_notifications.is_error()) {
group.is_loaded_from_database = true; // do not try again to load it
return;
}
auto notifications = r_notifications.move_as_ok();
CHECK(limit > 0);
if (notifications.empty()) {
group.is_loaded_from_database = true;
}
auto first_notification_id = get_first_notification_id(group);
if (first_notification_id.is_valid()) {
while (!notifications.empty() && notifications.back().notification_id.get() >= first_notification_id.get()) {
// possible if notifications was added after the database request was sent
notifications.pop_back();
}
}
add_notifications_to_group_begin(std::move(group_it), std::move(notifications));
group_it = get_group(group_id);
CHECK(group_it != groups_.end());
if (max_notification_group_size_ > group_it->second.notifications.size()) {
load_message_notifications_from_database(group_it->first, group_it->second, keep_notification_group_size_);
}
}
void NotificationManager::add_notifications_to_group_begin(NotificationGroups::iterator group_it,
vector<Notification> notifications) {
CHECK(group_it != groups_.end());
if (notifications.empty()) {
return;
}
VLOG(notifications) << "Add to beginning of " << group_it->first << " of size "
<< group_it->second.notifications.size() << ' ' << notifications;
auto group_key = group_it->first;
auto final_group_key = group_key;
for (auto &notification : notifications) {
if (notification.date > final_group_key.last_notification_date) {
final_group_key.last_notification_date = notification.date;
}
}
CHECK(final_group_key.last_notification_date != 0);
bool is_position_changed = final_group_key.last_notification_date != group_key.last_notification_date;
NotificationGroup group = std::move(group_it->second);
if (is_position_changed) {
VLOG(notifications) << "Position of notification group is changed from " << group_key << " to " << final_group_key;
delete_group(std::move(group_it));
}
auto last_group_key = get_last_updated_group_key();
bool was_updated = false;
bool is_updated = false;
if (is_position_changed) {
was_updated = group_key.last_notification_date != 0 && group_key < last_group_key;
is_updated = final_group_key.last_notification_date != 0 && final_group_key < last_group_key;
} else {
was_updated = is_updated = !(last_group_key < group_key);
}
if (!is_updated) {
CHECK(!was_updated);
VLOG(notifications) << "There is no need to send updateNotificationGroup in " << group_key
<< ", because of newer notification groups";
group.notifications.insert(group.notifications.begin(), std::make_move_iterator(notifications.begin()),
std::make_move_iterator(notifications.end()));
} else {
if (!was_updated) {
if (last_group_key.last_notification_date != 0) {
// need to remove last notification group to not exceed max_notification_group_count_
send_remove_group_update(last_group_key, groups_[last_group_key], vector<int32>());
}
send_add_group_update(group_key, group);
}
vector<Notification> new_notifications;
vector<td_api::object_ptr<td_api::notification>> added_notifications;
new_notifications.reserve(notifications.size());
added_notifications.reserve(notifications.size());
for (auto &notification : notifications) {
added_notifications.push_back(get_notification_object(group_key.dialog_id, notification));
if (added_notifications.back()->type_ == nullptr) {
added_notifications.pop_back();
} else {
new_notifications.push_back(std::move(notification));
}
}
notifications = std::move(new_notifications);
size_t old_notification_count = group.notifications.size();
auto updated_notification_count = old_notification_count < max_notification_group_size_
? max_notification_group_size_ - old_notification_count
: 0;
if (added_notifications.size() > updated_notification_count) {
added_notifications.erase(added_notifications.begin(), added_notifications.end() - updated_notification_count);
}
auto new_notification_count = old_notification_count < keep_notification_group_size_
? keep_notification_group_size_ - old_notification_count
: 0;
if (new_notification_count > notifications.size()) {
new_notification_count = notifications.size();
}
if (new_notification_count != 0) {
VLOG(notifications) << "Add " << new_notification_count << " notifications to " << group_key.group_id
<< " with current size " << group.notifications.size();
group.notifications.insert(group.notifications.begin(),
std::make_move_iterator(notifications.end() - new_notification_count),
std::make_move_iterator(notifications.end()));
}
if (!added_notifications.empty()) {
add_update_notification_group(td_api::make_object<td_api::updateNotificationGroup>(
group_key.group_id.get(), group_key.dialog_id.get(), 0, true, group.total_count,
std::move(added_notifications), vector<int32>()));
}
}
if (is_position_changed) {
add_group(std::move(final_group_key), std::move(group));
} else {
group_it->second = std::move(group);
}
}
size_t NotificationManager::get_max_notification_group_size() const {
@ -795,9 +962,7 @@ void NotificationManager::do_flush_pending_notifications(NotificationGroupKey &g
}
group.total_count += narrow_cast<int32>(added_notifications.size());
if (added_notifications.size() > max_notification_group_size_) {
added_notifications.erase(
added_notifications.begin(),
added_notifications.begin() + (added_notifications.size() - max_notification_group_size_));
added_notifications.erase(added_notifications.begin(), added_notifications.end() - max_notification_group_size_);
}
vector<int32> removed_notification_ids;
@ -893,6 +1058,7 @@ void NotificationManager::flush_pending_notifications(NotificationGroupId group_
CHECK(!was_updated);
VLOG(notifications) << "There is no need to send updateNotificationGroup in " << group_key
<< ", because of newer notification groups";
group.total_count += narrow_cast<int32>(group.pending_notifications.size());
for (auto &pending_notification : group.pending_notifications) {
group.notifications.emplace_back(pending_notification.notification_id, pending_notification.date,
std::move(pending_notification.type));
@ -900,7 +1066,7 @@ void NotificationManager::flush_pending_notifications(NotificationGroupId group_
} else {
if (!was_updated) {
if (last_group_key.last_notification_date != 0) {
// need to remove last notification group to not exceed max_notification_group_size_
// need to remove last notification group to not exceed max_notification_group_count_
send_remove_group_update(last_group_key, groups_[last_group_key], vector<int32>());
}
send_add_group_update(group_key, group);
@ -925,13 +1091,10 @@ void NotificationManager::flush_pending_notifications(NotificationGroupId group_
group.pending_notifications_flush_time = 0;
group.pending_notifications.clear();
if (group.notifications.size() >
keep_notification_group_size_ +
EXTRA_GROUP_SIZE) { // ensure that we delete a lot of notifications simultaneously
// if we can delete a lot of notifications simultaneously
if (group.notifications.size() > keep_notification_group_size_ + EXTRA_GROUP_SIZE) {
// keep only keep_notification_group_size_ last notifications in memory
group.notifications.erase(
group.notifications.begin(),
group.notifications.begin() + (group.notifications.size() - keep_notification_group_size_));
group.notifications.erase(group.notifications.begin(), group.notifications.end() - keep_notification_group_size_);
}
add_group(std::move(final_group_key), std::move(group));
@ -1144,6 +1307,13 @@ void NotificationManager::remove_notification(NotificationGroupId group_id, Noti
}
}
if (is_permanent) {
group_it->second.total_count--;
}
if (is_found) {
group_it->second.notifications.erase(group_it->second.notifications.begin() + notification_pos);
}
vector<td_api::object_ptr<td_api::notification>> added_notifications;
vector<int32> removed_notification_ids;
if (is_found && notification_pos + max_notification_group_size_ >= old_group_size) {
@ -1156,17 +1326,10 @@ void NotificationManager::remove_notification(NotificationGroupId group_id, Noti
added_notifications.pop_back();
}
} else {
load_message_notifications_from_database(group_it->first, group_it->second);
load_message_notifications_from_database(group_it->first, group_it->second, keep_notification_group_size_);
}
}
if (is_permanent) {
group_it->second.total_count--;
}
if (is_found) {
group_it->second.notifications.erase(group_it->second.notifications.begin() + notification_pos);
}
if (is_permanent || !removed_notification_ids.empty()) {
on_notifications_removed(std::move(group_it), std::move(added_notifications), std::move(removed_notification_ids));
}
@ -1353,6 +1516,10 @@ void NotificationManager::on_notification_group_size_max_changed() {
return;
}
auto new_keep_notification_group_size =
new_max_notification_group_size_size_t +
clamp(new_max_notification_group_size_size_t, EXTRA_GROUP_SIZE / 2, EXTRA_GROUP_SIZE);
VLOG(notifications) << "Change max notification group size from " << max_notification_group_size_ << " to "
<< new_max_notification_group_size;
@ -1381,6 +1548,9 @@ void NotificationManager::on_notification_group_size_max_changed() {
}
CHECK(!removed_notification_ids.empty());
} else {
if (new_max_notification_group_size_size_t > notification_count) {
load_message_notifications_from_database(group_key, group, new_keep_notification_group_size);
}
if (notification_count <= max_notification_group_size_) {
VLOG(notifications) << "There is no need to update " << group_key.group_id;
continue;
@ -1392,10 +1562,6 @@ void NotificationManager::on_notification_group_size_max_changed() {
added_notifications.pop_back();
}
}
if (new_max_notification_group_size_size_t > notification_count &&
static_cast<size_t>(group.total_count) > notification_count && G()->parameters().use_message_db) {
load_message_notifications_from_database(group_key, group);
}
if (added_notifications.empty()) {
continue;
}
@ -1409,8 +1575,7 @@ void NotificationManager::on_notification_group_size_max_changed() {
}
max_notification_group_size_ = new_max_notification_group_size_size_t;
keep_notification_group_size_ =
max_notification_group_size_ + clamp(max_notification_group_size_, EXTRA_GROUP_SIZE / 2, EXTRA_GROUP_SIZE);
keep_notification_group_size_ = new_keep_notification_group_size;
}
void NotificationManager::on_online_cloud_timeout_changed() {

View File

@ -106,6 +106,8 @@ class NotificationManager : public Actor {
struct NotificationGroup {
int32 total_count = 0;
bool is_loaded_from_database = false;
bool is_being_loaded_from_database = false;
vector<Notification> notifications;
@ -139,9 +141,17 @@ class NotificationManager : public Actor {
void delete_group(NotificationGroups::iterator &&group_it);
static NotificationId get_first_notification_id(const NotificationGroup &group);
int32 load_message_notification_groups_from_database(int32 limit, bool send_update);
void load_message_notifications_from_database(const NotificationGroupKey &group_key, const NotificationGroup &group);
void load_message_notifications_from_database(const NotificationGroupKey &group_key, NotificationGroup &group,
size_t desired_size);
void on_get_message_notifications_from_database(NotificationGroupId group_id, size_t limit,
Result<vector<Notification>> r_notifications);
void add_notifications_to_group_begin(NotificationGroups::iterator group_it, vector<Notification> notifications);
NotificationGroupKey get_last_updated_group_key() const;