Add support for notifications about new calls.

GitOrigin-RevId: fc4f9c1a484832241f2059a1ec58af6e5abcb198
This commit is contained in:
levlam 2018-12-18 23:59:35 +03:00
parent f073317738
commit e586b31a87
8 changed files with 224 additions and 21 deletions

View File

@ -19,6 +19,7 @@
#include "td/telegram/misc.h"
#include "td/telegram/net/NetQueryCreator.h"
#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/NotificationManager.h"
#include "td/telegram/Td.h"
#include "td/telegram/UpdatesManager.h"
@ -618,6 +619,22 @@ void CallActor::on_discard_query_result(NetQueryPtr net_query) {
void CallActor::flush_call_state() {
if (call_state_need_flush_) {
if (!is_outgoing_) {
if (call_state_.type == CallState::Type::Pending) {
if (!has_notification_) {
has_notification_ = true;
send_closure(G()->notification_manager(), &NotificationManager::add_call_notification,
DialogId(UserId(call_admin_id_)), local_call_id_);
}
} else {
if (has_notification_) {
has_notification_ = false;
send_closure(G()->notification_manager(), &NotificationManager::remove_call_notification,
DialogId(UserId(call_admin_id_)), local_call_id_);
}
}
}
if (call_state_.type == CallState::Type::Ready && !call_state_has_config_) {
return;
}

View File

@ -119,6 +119,7 @@ class CallActor : public NetQueryCallback {
CallId local_call_id_;
int64 call_id_{0};
bool is_call_id_inited_{false};
bool has_notification_{false};
int64 call_access_hash_{0};
int32 call_admin_id_{0};
int32 call_participant_id_{0};

View File

@ -10407,8 +10407,12 @@ void MessagesManager::remove_message_notification_id(Dialog *d, Message *m, bool
fix_dialog_last_notification_id(d, m->message_id);
}
if (is_permanent) {
send_closure_later(G()->notification_manager(), &NotificationManager::remove_notification,
d->message_notification_group_id, notification_id, true, Promise<Unit>());
if (m->notification_id.get() > d->max_removed_notification_id.get()) {
send_closure_later(G()->notification_manager(), &NotificationManager::remove_notification,
d->message_notification_group_id, notification_id, true, Promise<Unit>());
}
// on_message_changed will be called by the caller
// don't need to call there to not save twice/or to save just deleted message
} else {
on_message_changed(d, m, false, "remove_message_notification_id");
}
@ -17805,7 +17809,8 @@ void MessagesManager::on_get_message_notifications_from_database(DialogId dialog
res.reserve(messages.size());
NotificationId from_notification_id;
MessageId from_message_id;
VLOG(notifications) << "Loaded " << messages.size() << " messages with notifications from database";
VLOG(notifications) << "Loaded " << messages.size() << " messages with notifications in " << dialog_id
<< " from database";
for (auto &message : messages) {
auto m = on_get_message_from_database(dialog_id, d, std::move(message));
if (m == nullptr || !m->notification_id.is_valid()) {

View File

@ -70,7 +70,7 @@ void NotificationManager::on_flush_pending_updates_timeout_callback(void *notifi
}
bool NotificationManager::is_disabled() const {
return td_->auth_manager_->is_bot();
return td_->auth_manager_->is_bot() || G()->close_flag();
}
namespace {
@ -133,6 +133,17 @@ void NotificationManager::start_up() {
send_closure(G()->td(), &Td::send_update, std::move(update));
}
}
auto call_notification_group_ids_string = G()->td_db()->get_binlog_pmc()->get("notification_call_group_ids");
if (!call_notification_group_ids_string.empty()) {
call_notification_group_ids_ = transform(full_split(call_notification_group_ids_string, ','), [](Slice str) {
return NotificationGroupId{to_integer_safe<int32>(str).ok()};
});
VLOG(notifications) << "Load call_notification_group_ids_ = " << call_notification_group_ids_;
for (auto &group_id : call_notification_group_ids_) {
available_call_notification_group_ids_.insert(group_id);
}
}
}
td_api::object_ptr<td_api::updateActiveNotifications> NotificationManager::get_update_active_notificaitons() const {
@ -198,6 +209,11 @@ NotificationManager::NotificationGroups::iterator NotificationManager::get_group
return group_it;
}
if (std::find(call_notification_group_ids_.begin(), call_notification_group_ids_.end(), group_id) !=
call_notification_group_ids_.end()) {
return groups_.end();
}
auto message_group = td_->messages_manager_->get_message_notification_group_force(group_id);
if (!message_group.dialog_id.is_valid()) {
return groups_.end();
@ -286,7 +302,7 @@ void NotificationManager::load_message_notifications_from_database(const Notific
if (!G()->parameters().use_message_db) {
return;
}
if (group.is_loaded_from_database || group.is_being_loaded_from_database) {
if (group.is_loaded_from_database || group.is_being_loaded_from_database || !group.contains_messages) {
return;
}
if (group.total_count == 0) {
@ -579,8 +595,13 @@ void NotificationManager::add_notification(NotificationGroupId group_id, DialogI
auto group_it = get_group_force(group_id);
if (group_it == groups_.end()) {
group_it = add_group(NotificationGroupKey(group_id, dialog_id, 0), NotificationGroup());
if (type->get_call_id().is_valid()) {
group_it->second.contains_messages = false;
}
}
CHECK(group_it->second.contains_messages == !type->get_call_id().is_valid());
PendingNotification notification;
notification.date = date;
notification.settings_dialog_id = notification_settings_dialog_id;
@ -1106,7 +1127,6 @@ void NotificationManager::send_add_group_update(const NotificationGroupKey &grou
void NotificationManager::flush_pending_notifications(NotificationGroupId group_id) {
auto group_it = get_group(group_id);
if (group_it == groups_.end()) {
CHECK(group_id.get() > current_notification_group_id_.get());
return;
}
@ -1172,7 +1192,7 @@ void NotificationManager::flush_pending_notifications(NotificationGroupId group_
group.pending_notifications_flush_time = 0;
group.pending_notifications.clear();
// if we can delete a lot of notifications simultaneously
if (group.notifications.size() > keep_notification_group_size_ + EXTRA_GROUP_SIZE) {
if (group.notifications.size() > keep_notification_group_size_ + EXTRA_GROUP_SIZE && group.contains_messages) {
// keep only keep_notification_group_size_ last notifications in memory
group.notifications.erase(group.notifications.begin(), group.notifications.end() - keep_notification_group_size_);
group.is_loaded_from_database = false;
@ -1291,8 +1311,8 @@ void NotificationManager::on_notifications_removed(
}
if (last_loaded_notification_group_key_ < last_group_key) {
load_message_notification_groups_from_database(
td::max(static_cast<int32>(max_notification_group_count_), DEFAULT_GROUP_COUNT_MAX) / 2, true);
load_message_notification_groups_from_database(td::max(static_cast<int32>(max_notification_group_count_), 10) / 2,
true);
}
}
@ -1362,7 +1382,7 @@ void NotificationManager::remove_notification(NotificationGroupId group_id, Noti
return promise.set_value(Unit());
}
if (!is_permanent) {
if (!is_permanent && group_it->second.contains_messages) {
td_->messages_manager_->remove_message_notification(group_it->first.dialog_id, notification_id);
}
@ -1389,8 +1409,14 @@ void NotificationManager::remove_notification(NotificationGroupId group_id, Noti
}
}
if (is_permanent) {
group_it->second.total_count--;
bool is_total_count_changed = false;
if ((group_it->second.contains_messages && is_permanent) || (!group_it->second.contains_messages && is_found)) {
if (group_it->second.total_count == 0) {
LOG(ERROR) << "Total notification count became negative in " << group_id << " after removing " << notification_id;
} else {
group_it->second.total_count--;
is_total_count_changed = true;
}
}
if (is_found) {
group_it->second.notifications.erase(group_it->second.notifications.begin() + notification_pos);
@ -1412,7 +1438,7 @@ void NotificationManager::remove_notification(NotificationGroupId group_id, Noti
}
}
if (is_permanent || !removed_notification_ids.empty()) {
if (is_total_count_changed || !removed_notification_ids.empty()) {
on_notifications_removed(std::move(group_it), std::move(added_notifications), std::move(removed_notification_ids));
}
@ -1451,7 +1477,9 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id
if (max_notification_id.get() > current_notification_id_.get()) {
max_notification_id = current_notification_id_;
}
td_->messages_manager_->remove_message_notifications(group_it->first.dialog_id, max_notification_id);
if (group_it->second.contains_messages) {
td_->messages_manager_->remove_message_notifications(group_it->first.dialog_id, max_notification_id);
}
}
auto pending_delete_end = group_it->second.pending_notifications.begin();
@ -1494,16 +1522,21 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id
}
}
VLOG(notifications) << "Need to delete " << notification_delete_end << " from "
<< group_it->second.notifications.size() << " notifications";
if (is_found) {
group_it->second.notifications.erase(group_it->second.notifications.begin(),
group_it->second.notifications.begin() + notification_delete_end);
}
if (!group_it->second.contains_messages) {
new_total_count = static_cast<int32>(group_it->second.notifications.size());
}
if (group_it->second.total_count == new_total_count) {
new_total_count = -1;
}
if (new_total_count != -1) {
group_it->second.total_count = new_total_count;
}
if (is_found) {
group_it->second.notifications.erase(group_it->second.notifications.begin(),
group_it->second.notifications.begin() + notification_delete_end);
}
if (new_total_count != -1 || !removed_notification_ids.empty()) {
on_notifications_removed(std::move(group_it), vector<td_api::object_ptr<td_api::notification>>(),
@ -1530,6 +1563,119 @@ void NotificationManager::remove_notification_group(NotificationGroupId group_id
promise.set_value(Unit());
}
NotificationGroupId NotificationManager::get_call_notification_group_id(DialogId dialog_id) {
auto it = dialog_id_to_call_notification_group_id_.find(dialog_id);
if (it != dialog_id_to_call_notification_group_id_.end()) {
return it->second;
}
if (available_call_notification_group_ids_.empty()) {
// need to reserve new group_id for calls
if (call_notification_group_ids_.size() >= MAX_CALL_NOTIFICATION_GROUPS) {
return {};
}
NotificationGroupId last_group_id;
if (!call_notification_group_ids_.empty()) {
last_group_id = call_notification_group_ids_.back();
}
NotificationGroupId next_notification_group_id;
do {
next_notification_group_id = get_next_notification_group_id();
} while (last_group_id.get() >= next_notification_group_id.get()); // just in case
VLOG(notifications) << "Add call " << next_notification_group_id;
call_notification_group_ids_.push_back(next_notification_group_id);
auto call_notification_group_ids_string = implode(
transform(call_notification_group_ids_, [](NotificationGroupId group_id) { return to_string(group_id.get()); }),
',');
G()->td_db()->get_binlog_pmc()->set("notification_call_group_ids", call_notification_group_ids_string);
available_call_notification_group_ids_.insert(next_notification_group_id);
}
auto available_it = available_call_notification_group_ids_.begin();
auto group_id = *available_it;
available_call_notification_group_ids_.erase(available_it);
dialog_id_to_call_notification_group_id_[dialog_id] = group_id;
return group_id;
}
void NotificationManager::add_call_notification(DialogId dialog_id, CallId call_id) {
CHECK(dialog_id.is_valid());
CHECK(call_id.is_valid());
if (is_disabled() || max_notification_group_count_ == 0) {
return;
}
auto group_id = get_call_notification_group_id(dialog_id);
if (!group_id.is_valid()) {
VLOG(notifications) << "Ignore notification about " << call_id << " in " << dialog_id;
return;
}
G()->td().get_actor_unsafe()->messages_manager_->force_create_dialog(dialog_id, "add_call_notification");
auto &active_notifications = active_call_notifications_[dialog_id];
if (active_notifications.size() >= MAX_CALL_NOTIFICATIONS) {
VLOG(notifications) << "Ignore notification about " << call_id << " in " << dialog_id << " and " << group_id;
return;
}
auto notification_id = get_next_notification_id();
active_notifications.push_back(ActiveCallNotification{call_id, notification_id});
add_notification(group_id, dialog_id, G()->unix_time() + 120, dialog_id, false, notification_id,
create_new_call_notification(call_id));
}
void NotificationManager::remove_call_notification(DialogId dialog_id, CallId call_id) {
CHECK(dialog_id.is_valid());
CHECK(call_id.is_valid());
if (is_disabled() || max_notification_group_count_ == 0) {
return;
}
auto group_id_it = dialog_id_to_call_notification_group_id_.find(dialog_id);
if (group_id_it == dialog_id_to_call_notification_group_id_.end()) {
VLOG(notifications) << "Ignore removing notification about " << call_id << " in " << dialog_id;
return;
}
auto group_id = group_id_it->second;
CHECK(group_id.is_valid());
auto &active_notifications = active_call_notifications_[dialog_id];
for (auto it = active_notifications.begin(); it != active_notifications.end(); ++it) {
if (it->call_id == call_id) {
remove_notification(group_id, it->notification_id, true, Promise<Unit>());
active_notifications.erase(it);
if (active_notifications.empty()) {
VLOG(notifications) << "Reuse call " << group_id;
active_call_notifications_.erase(dialog_id);
available_call_notification_group_ids_.insert(group_id);
dialog_id_to_call_notification_group_id_.erase(dialog_id);
flush_pending_notifications_timeout_.cancel_timeout(group_id.get());
flush_pending_notifications(group_id);
flush_pending_updates_timeout_.cancel_timeout(group_id.get());
flush_pending_updates(group_id.get(), "reuse call group_id");
auto group_it = get_group(group_id);
CHECK(group_it->first.dialog_id == dialog_id);
CHECK(group_it->first.last_notification_date == 0);
CHECK(group_it->second.total_count == 0);
CHECK(group_it->second.notifications.empty());
CHECK(group_it->second.pending_notifications.empty());
CHECK(!group_it->second.contains_messages);
CHECK(!group_it->second.is_being_loaded_from_database);
CHECK(pending_updates_.count(group_id.get()) == 0);
delete_group(std::move(group_it));
}
return;
}
}
VLOG(notifications) << "Failed to find " << call_id << " in " << dialog_id << " and " << group_id;
}
void NotificationManager::on_notification_group_count_max_changed(bool send_updates) {
if (is_disabled()) {
return;
@ -1586,8 +1732,7 @@ void NotificationManager::on_notification_group_count_max_changed(bool send_upda
max_notification_group_count_ = new_max_notification_group_count_size_t;
if (is_increased && last_loaded_notification_group_key_ < get_last_updated_group_key()) {
load_message_notification_groups_from_database(
td::max(new_max_notification_group_count, DEFAULT_GROUP_COUNT_MAX / 2), true);
load_message_notification_groups_from_database(td::max(new_max_notification_group_count, 5), true);
}
}

View File

@ -6,6 +6,7 @@
//
#pragma once
#include "td/telegram/CallId.h"
#include "td/telegram/DialogId.h"
#include "td/telegram/MessageId.h"
#include "td/telegram/Notification.h"
@ -69,6 +70,10 @@ class NotificationManager : public Actor {
void remove_notification_group(NotificationGroupId group_id, NotificationId max_notification_id,
MessageId max_message_id, int32 new_total_count, Promise<Unit> &&promise);
void add_call_notification(DialogId dialog_id, CallId call_id);
void remove_call_notification(DialogId dialog_id, CallId call_id);
void on_notification_group_count_max_changed(bool send_updates);
void on_notification_group_size_max_changed();
@ -94,6 +99,9 @@ class NotificationManager : public Actor {
static constexpr int32 DEFAULT_GROUP_SIZE_MAX = 10;
static constexpr size_t EXTRA_GROUP_SIZE = 10;
static constexpr size_t MAX_CALL_NOTIFICATION_GROUPS = 10;
static constexpr size_t MAX_CALL_NOTIFICATIONS = 10;
static constexpr int32 DEFAULT_ONLINE_CLOUD_TIMEOUT_MS = 300000;
static constexpr int32 DEFAULT_ONLINE_CLOUD_DELAY_MS = 30000;
static constexpr int32 DEFAULT_DEFAULT_DELAY_MS = 1500;
@ -115,6 +123,7 @@ class NotificationManager : public Actor {
int32 total_count = 0;
bool is_loaded_from_database = false;
bool is_being_loaded_from_database = false;
bool contains_messages = true;
vector<Notification> notifications;
@ -192,6 +201,8 @@ class NotificationManager : public Actor {
void flush_all_pending_updates(bool include_delayed_chats, const char *source);
NotificationGroupId get_call_notification_group_id(DialogId dialog_id);
void after_get_difference_impl();
void after_get_chat_difference_impl(NotificationGroupId group_id);
@ -220,6 +231,16 @@ class NotificationManager : public Actor {
MultiTimeout flush_pending_notifications_timeout_{"FlushPendingNotificationsTimeout"};
MultiTimeout flush_pending_updates_timeout_{"FlushPendingUpdatesTimeout"};
vector<NotificationGroupId> call_notification_group_ids_;
std::unordered_set<NotificationGroupId, NotificationGroupIdHash> available_call_notification_group_ids_;
std::unordered_map<DialogId, NotificationGroupId, DialogIdHash> dialog_id_to_call_notification_group_id_;
struct ActiveCallNotification {
CallId call_id;
NotificationId notification_id;
};
std::unordered_map<DialogId, vector<ActiveCallNotification>, DialogIdHash> active_call_notifications_;
Td *td_;
ActorShared<> parent_;
};

View File

@ -20,6 +20,10 @@ class NotificationTypeMessage : public NotificationType {
return message_id_;
}
CallId get_call_id() const override {
return CallId();
}
td_api::object_ptr<td_api::NotificationType> get_notification_type_object(DialogId dialog_id) const override {
auto message_object = G()->td().get_actor_unsafe()->messages_manager_->get_message_object({dialog_id, message_id_});
if (message_object == nullptr) {
@ -52,6 +56,10 @@ class NotificationTypeSecretChat : public NotificationType {
return MessageId();
}
CallId get_call_id() const override {
return CallId();
}
td_api::object_ptr<td_api::NotificationType> get_notification_type_object(DialogId dialog_id) const override {
return td_api::make_object<td_api::notificationTypeNewSecretChat>();
}
@ -78,6 +86,10 @@ class NotificationTypeCall : public NotificationType {
return MessageId::max();
}
CallId get_call_id() const override {
return call_id_;
}
td_api::object_ptr<td_api::NotificationType> get_notification_type_object(DialogId dialog_id) const override {
return td_api::make_object<td_api::notificationTypeNewCall>(call_id_.get());
}

View File

@ -31,6 +31,8 @@ class NotificationType {
virtual MessageId get_message_id() const = 0;
virtual CallId get_call_id() const = 0;
virtual td_api::object_ptr<td_api::NotificationType> get_notification_type_object(DialogId dialog_id) const = 0;
virtual StringBuilder &to_string_builder(StringBuilder &string_builder) const = 0;

View File

@ -35,7 +35,7 @@ Status SqliteKeyValue::init_with_connection(SqliteDb connection, string table_na
get_stmt_ = std::move(get_stmt);
TRY_RESULT(erase_stmt, db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE k = ?1"));
erase_stmt_ = std::move(erase_stmt);
TRY_RESULT(get_all_stmt, db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_ << ""));
TRY_RESULT(get_all_stmt, db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_));
get_all_stmt_ = std::move(get_all_stmt);
TRY_RESULT(erase_by_prefix_stmt,