Improve flush_pending_updates.

GitOrigin-RevId: 54cc8bddd6654e9972eb4346784c93e84f551ed1
This commit is contained in:
levlam 2018-11-26 15:58:42 +03:00
parent 173ba39ad0
commit 60e685bd7c
3 changed files with 114 additions and 33 deletions

View File

@ -22,7 +22,7 @@ TDLib (Telegram Database library) is a cross-platform library for building [Tele
* **Cross-platform**: `TDLib` can be used on Android, iOS, Windows, macOS, Linux, FreeBSD, Windows Phone, WebAssembly, watchOS, tvOS, Tizen, Cygwin. It should also work on other *nix systems with or without minimal effort.
* **Multilanguage**: `TDLib` can be easily used with any programming language that is able to execute C functions. Additionally it already has native Java (using `JNI`) bindings and .NET (using `C++/CLI` and `C++/CX`) bindings.
* **Easy to use**: `TDLib` takes care of all network implementation details, encryption and local data storage.
* **High-performance**: in the [Telegram Bot API](https://core.telegram.org/bots/api), each `TDLib` instance handles more than 23000 active bots simultaneously.
* **High-performance**: in the [Telegram Bot API](https://core.telegram.org/bots/api), each `TDLib` instance handles more than 24000 active bots simultaneously.
* **Well-documented**: all `TDLib` API methods and public interfaces are fully documented.
* **Consistent**: `TDLib` guarantees that all updates are delivered in the right order.
* **Reliable**: `TDLib` remains stable on slow and unreliable Internet connections.

View File

@ -20316,6 +20316,11 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
}
}
}
if (*need_update && dialog_id.get_type() == DialogType::Channel &&
message->date < G()->unix_time_cached() - 2 * 86400 && Slice(source) == Slice("updateNewChannelMessage")) {
// if the message is pretty old, we might have missed the update that the message has already been read
repair_channel_server_unread_count(d);
}
if (*need_update) {
add_new_message_notification(d, message.get(), false);

View File

@ -352,19 +352,26 @@ void NotificationManager::flush_pending_updates(int32 group_id, const char *sour
// we need to keep only additions of notifications from added_notification_ids/edited_notification_ids and
// all edits of notifications from edited_notification_ids
// deletions of a notification can be removed, only if the addition of the notification has already been deleted
// deletions of all unkept notifications can be moved to the first updateNotificationGroup
// after that at every moment there is no more active notifications than in the last moment,
// so left deletions after add/edit can be safely removed and following additions can be treated as edits
// we still need to keep deletions coming first, because we can't have 2 consequent additions
// from all additions of the same notification, we need to preserve the first, because it can be with sound,
// and the last, because it can be after a deletion
// all edits can be merged to first addition/edit after last deletion
// all other additions and edits can be merged to the first addition/edit
// i.e. in edit+delete+add chain we want to remove deletion and merge addition to the edit
bool is_changed = true;
while (is_changed) {
is_changed = false;
size_t cur_pos = 1;
std::unordered_map<int32, size_t> add_notification_pos;
std::unordered_map<int32, size_t> edit_notification_pos;
size_t cur_pos = 0;
std::unordered_map<int32, size_t> first_add_notification_pos;
std::unordered_map<int32, size_t> first_edit_notification_pos;
std::unordered_set<int32> can_be_deleted_notification_ids;
std::vector<int32> moved_deleted_notification_ids;
size_t first_notification_group_pos = 0;
for (auto &update : updates) {
cur_pos++;
if (update == nullptr) {
is_changed = true;
continue;
@ -375,8 +382,9 @@ void NotificationManager::flush_pending_updates(int32 group_id, const char *sour
for (auto &notification : update_ptr->added_notifications_) {
auto notification_id = notification->id_;
if (added_notification_ids.count(notification_id) == 0 &&
edited_notification_ids.count(notification_id) == 0) {
bool is_needed =
added_notification_ids.count(notification_id) != 0 || edited_notification_ids.count(notification_id) != 0;
if (!is_needed) {
VLOG(notifications) << "Remove unneeded addition of " << notification_id << " in update " << cur_pos;
can_be_deleted_notification_ids.insert(notification_id);
notification = nullptr;
@ -384,10 +392,41 @@ void NotificationManager::flush_pending_updates(int32 group_id, const char *sour
continue;
}
auto &pos = add_notification_pos[notification_id];
CHECK(pos < cur_pos);
pos = cur_pos;
CHECK(edit_notification_pos.count(notification_id) == 0); // there can't be addition just after edit
auto edit_it = first_edit_notification_pos.find(notification_id);
if (edit_it != first_edit_notification_pos.end()) {
VLOG(notifications) << "Move addition of " << notification_id << " in update " << cur_pos
<< " to edit in update " << edit_it->second;
CHECK(edit_it->second < cur_pos);
auto previous_update_ptr = static_cast<td_api::updateNotification *>(updates[edit_it->second - 1].get());
CHECK(previous_update_ptr->notification_->id_ == notification_id);
previous_update_ptr->notification_->type_ = std::move(notification->type_);
is_changed = true;
notification = nullptr;
continue;
}
auto add_it = first_add_notification_pos.find(notification_id);
if (add_it != first_add_notification_pos.end()) {
VLOG(notifications) << "Move addition of " << notification_id << " in update " << cur_pos << " to update "
<< add_it->second;
CHECK(add_it->second < cur_pos);
auto previous_update_ptr =
static_cast<td_api::updateNotificationGroup *>(updates[add_it->second - 1].get());
bool is_found = false;
for (auto &prev_notification : previous_update_ptr->added_notifications_) {
if (prev_notification->id_ == notification_id) {
prev_notification->type_ = std::move(notification->type_);
is_found = true;
break;
}
}
CHECK(is_found);
is_changed = true;
notification = nullptr;
continue;
}
// it is a first addition/edit of needed notification
first_add_notification_pos[notification_id] = cur_pos;
}
update_ptr->added_notifications_.erase(
std::remove_if(update_ptr->added_notifications_.begin(), update_ptr->added_notifications_.end(),
@ -399,21 +438,37 @@ void NotificationManager::flush_pending_updates(int32 group_id, const char *sour
}
for (auto &notification_id : update_ptr->removed_notification_ids_) {
bool is_needed =
added_notification_ids.count(notification_id) != 0 || edited_notification_ids.count(notification_id) != 0;
if (can_be_deleted_notification_ids.count(notification_id) == 1) {
CHECK(!is_needed);
VLOG(notifications) << "Remove unneeded deletion of " << notification_id << " in update " << cur_pos;
notification_id = 0;
is_changed = true;
continue;
}
auto edit_it = edit_notification_pos.find(notification_id);
if (edit_it != edit_notification_pos.end()) {
VLOG(notifications) << "Remove unneeded updateNotification " << edit_it->second;
CHECK(edit_it->second < cur_pos);
updates[edit_it->second - 1] = nullptr;
if (!is_needed) {
if (first_notification_group_pos != 0) {
VLOG(notifications) << "Need to keep deletion of " << notification_id << " in update " << cur_pos
<< ", but can move it to the first updateNotificationGroup at pos "
<< first_notification_group_pos;
moved_deleted_notification_ids.push_back(notification_id);
notification_id = 0;
is_changed = true;
edit_notification_pos.erase(edit_it);
}
continue;
}
if (first_add_notification_pos.count(notification_id) != 0 ||
first_edit_notification_pos.count(notification_id) != 0) {
// the notification will be re-added, and we will be able to merge the addition with previous update, so we can just remove the deletion
VLOG(notifications) << "Remove unneeded deletion in update " << cur_pos;
notification_id = 0;
is_changed = true;
continue;
}
// we need to keep the deletion, because otherwise we will have 2 consequent additions
}
update_ptr->removed_notification_ids_.erase(
std::remove_if(update_ptr->removed_notification_ids_.begin(), update_ptr->removed_notification_ids_.end(),
@ -437,18 +492,24 @@ void NotificationManager::flush_pending_updates(int32 group_id, const char *sour
update = nullptr;
}
}
if (first_notification_group_pos == 0 && update != nullptr) {
first_notification_group_pos = cur_pos;
}
} else {
CHECK(update->get_id() == td_api::updateNotification::ID);
auto update_ptr = static_cast<td_api::updateNotification *>(update.get());
auto notification_id = update_ptr->notification_->id_;
if (edited_notification_ids.count(notification_id) == 0) {
bool is_needed =
added_notification_ids.count(notification_id) != 0 || edited_notification_ids.count(notification_id) != 0;
if (!is_needed) {
VLOG(notifications) << "Remove unneeded update " << cur_pos;
is_changed = true;
update = nullptr;
continue;
}
auto edit_it = edit_notification_pos.find(notification_id);
if (edit_it != edit_notification_pos.end()) {
auto edit_it = first_edit_notification_pos.find(notification_id);
if (edit_it != first_edit_notification_pos.end()) {
VLOG(notifications) << "Move edit of " << notification_id << " in update " << cur_pos << " to update "
<< edit_it->second;
CHECK(edit_it->second < cur_pos);
@ -459,28 +520,42 @@ void NotificationManager::flush_pending_updates(int32 group_id, const char *sour
update = nullptr;
continue;
}
auto add_it = add_notification_pos.find(notification_id);
if (add_it != add_notification_pos.end()) {
auto add_it = first_add_notification_pos.find(notification_id);
if (add_it != first_add_notification_pos.end()) {
VLOG(notifications) << "Move edit of " << notification_id << " in update " << cur_pos << " to update "
<< edit_it->second;
<< add_it->second;
CHECK(add_it->second < cur_pos);
auto previous_update_ptr = static_cast<td_api::updateNotificationGroup *>(updates[edit_it->second - 1].get());
bool found = false;
auto previous_update_ptr = static_cast<td_api::updateNotificationGroup *>(updates[add_it->second - 1].get());
bool is_found = false;
for (auto &notification : previous_update_ptr->added_notifications_) {
if (notification->id_ == notification_id) {
notification->type_ = std::move(update_ptr->notification_->type_);
found = true;
is_found = true;
break;
}
}
CHECK(found);
CHECK(is_found);
is_changed = true;
update = nullptr;
continue;
}
edit_notification_pos[notification_id] = cur_pos;
// it is a first addition/edit of needed notification
first_edit_notification_pos[notification_id] = cur_pos;
}
cur_pos++;
}
if (!moved_deleted_notification_ids.empty()) {
CHECK(first_notification_group_pos != 0);
auto &update = updates[first_notification_group_pos - 1];
CHECK(update->get_id() == td_api::updateNotificationGroup::ID);
auto update_ptr = static_cast<td_api::updateNotificationGroup *>(update.get());
append(update_ptr->removed_notification_ids_, std::move(moved_deleted_notification_ids));
auto old_size = update_ptr->removed_notification_ids_.size();
std::sort(update_ptr->removed_notification_ids_.begin(), update_ptr->removed_notification_ids_.end());
update_ptr->removed_notification_ids_.erase(
std::unique(update_ptr->removed_notification_ids_.begin(), update_ptr->removed_notification_ids_.end()),
update_ptr->removed_notification_ids_.end());
CHECK(old_size == update_ptr->removed_notification_ids_.size());
}
updates.erase(std::remove_if(updates.begin(), updates.end(), [](auto &update) { return update == nullptr; }),
@ -686,7 +761,8 @@ void NotificationManager::flush_pending_notifications(NotificationGroupId group_
group.pending_notifications_flush_time = 0;
group.pending_notifications.clear();
if (group.notifications.size() >
keep_notification_group_size_ + EXTRA_GROUP_SIZE) { // ensure that we delete a lot of messages simultaneously
keep_notification_group_size_ +
EXTRA_GROUP_SIZE) { // ensure that we delete a lot of notifications simultaneously
// keep only keep_notification_group_size_ last notifications in memory
group.notifications.erase(
group.notifications.begin(),