MAke push notifications persistent.

GitOrigin-RevId: 3cb231596e1f75a8157db48880e48f542c0b811d
This commit is contained in:
levlam 2019-03-31 04:30:25 +03:00
parent 8157f7e9ae
commit 28351e8529
6 changed files with 195 additions and 12 deletions

View File

@ -27,6 +27,9 @@
#include "td/mtproto/PacketInfo.h" #include "td/mtproto/PacketInfo.h"
#include "td/mtproto/Transport.h" #include "td/mtproto/Transport.h"
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/binlog/BinlogHelper.h"
#include "td/utils/as.h" #include "td/utils/as.h"
#include "td/utils/base64.h" #include "td/utils/base64.h"
#include "td/utils/buffer.h" #include "td/utils/buffer.h"
@ -193,6 +196,8 @@ void NotificationManager::init() {
return; return;
} }
is_inited_ = true;
disable_contact_registered_notifications_ = disable_contact_registered_notifications_ =
G()->shared_config().get_option_boolean("disable_contact_registered_notifications"); G()->shared_config().get_option_boolean("disable_contact_registered_notifications");
auto sync_state = G()->td_db()->get_binlog_pmc()->get(get_is_contact_registered_notifications_synchronized_key()); auto sync_state = G()->td_db()->get_binlog_pmc()->get(get_is_contact_registered_notifications_synchronized_key());
@ -226,12 +231,10 @@ void NotificationManager::init() {
do { do {
loaded_groups += load_message_notification_groups_from_database(needed_groups, false); loaded_groups += load_message_notification_groups_from_database(needed_groups, false);
} while (loaded_groups < needed_groups && last_loaded_notification_group_key_.last_notification_date != 0); } while (loaded_groups < needed_groups && last_loaded_notification_group_key_.last_notification_date != 0);
auto update = get_update_active_notifications();
VLOG(notifications) << "Send " << as_active_notifications_update(update.get());
send_closure(G()->td(), &Td::send_update, std::move(update));
} }
try_send_update_active_notifications();
auto call_notification_group_ids_string = G()->td_db()->get_binlog_pmc()->get("notification_call_group_ids"); auto call_notification_group_ids_string = G()->td_db()->get_binlog_pmc()->get("notification_call_group_ids");
if (!call_notification_group_ids_string.empty()) { if (!call_notification_group_ids_string.empty()) {
call_notification_group_ids_ = transform(full_split(call_notification_group_ids_string, ','), [](Slice str) { call_notification_group_ids_ = transform(full_split(call_notification_group_ids_string, ','), [](Slice str) {
@ -1942,7 +1945,7 @@ void NotificationManager::remove_temporary_notifications(NotificationGroupId gro
bool is_total_count_changed = false; bool is_total_count_changed = false;
if (group.total_count == 0) { if (group.total_count == 0) {
LOG(ERROR) << "Total notification count became negative in " << group_id << " after removing " LOG(ERROR) << "Total notification count became negative in " << group_id << " after removing "
<< old_group_size - notification_pos << " temporary notificaitons"; << old_group_size - notification_pos << " temporary notificaitions";
} else { } else {
group.total_count -= narrow_cast<int32>(old_group_size - notification_pos); group.total_count -= narrow_cast<int32>(old_group_size - notification_pos);
is_total_count_changed = true; is_total_count_changed = true;
@ -2728,7 +2731,7 @@ Status NotificationManager::process_push_notification_payload(string payload) {
} else if (field_value.first == "google.sent_time") { } else if (field_value.first == "google.sent_time") {
TRY_RESULT(google_sent_time, get_json_object_long_field(json_value.get_object(), "google.sent_time")); TRY_RESULT(google_sent_time, get_json_object_long_field(json_value.get_object(), "google.sent_time"));
google_sent_time /= 1000; google_sent_time /= 1000;
if (sent_date - 86400 <= google_sent_time && google_sent_time <= sent_date + 5) { if (sent_date - 28 * 86400 <= google_sent_time && google_sent_time <= sent_date + 5) {
sent_date = narrow_cast<int32>(google_sent_time); sent_date = narrow_cast<int32>(google_sent_time);
} }
} else if (field_value.first == "google.notification.sound" && field_value.second.type() != JsonValue::Type::Null) { } else if (field_value.first == "google.notification.sound" && field_value.second.type() != JsonValue::Type::Null) {
@ -2938,19 +2941,114 @@ Status NotificationManager::process_push_notification_payload(string payload) {
return process_message_push_notification(dialog_id, MessageId(server_message_id), random_id, sender_user_id, return process_message_push_notification(dialog_id, MessageId(server_message_id), random_id, sender_user_id,
std::move(sender_name), sent_date, contains_mention, is_silent, std::move(sender_name), sent_date, contains_mention, is_silent,
std::move(loc_key), std::move(arg)); std::move(loc_key), std::move(arg), NotificationId(), 0);
} }
class NotificationManager::AddMessagePushNotificationLogEvent {
public:
DialogId dialog_id_;
MessageId message_id_;
int64 random_id_;
UserId sender_user_id_;
string sender_name_;
int32 date_;
bool contains_mention_;
bool is_silent_;
string loc_key_;
string arg_;
NotificationId notification_id_;
template <class StorerT>
void store(StorerT &storer) const {
bool has_message_id = message_id_.is_valid();
bool has_random_id = random_id_ != 0;
bool has_sender = sender_user_id_.is_valid();
bool has_sender_name = !sender_name_.empty();
bool has_arg = !arg_.empty();
BEGIN_STORE_FLAGS();
STORE_FLAG(contains_mention_);
STORE_FLAG(is_silent_);
STORE_FLAG(has_message_id);
STORE_FLAG(has_random_id);
STORE_FLAG(has_sender);
STORE_FLAG(has_sender_name);
STORE_FLAG(has_arg);
END_STORE_FLAGS();
td::store(dialog_id_, storer);
if (has_message_id) {
td::store(message_id_, storer);
}
if (has_random_id) {
td::store(random_id_, storer);
}
if (has_sender) {
td::store(sender_user_id_, storer);
}
if (has_sender_name) {
td::store(sender_name_, storer);
}
td::store(date_, storer);
td::store(loc_key_, storer);
if (has_arg) {
td::store(arg_, storer);
}
td::store(notification_id_, storer);
}
template <class ParserT>
void parse(ParserT &parser) {
bool has_message_id;
bool has_random_id;
bool has_sender;
bool has_sender_name;
bool has_arg;
BEGIN_PARSE_FLAGS();
PARSE_FLAG(contains_mention_);
PARSE_FLAG(is_silent_);
PARSE_FLAG(has_message_id);
PARSE_FLAG(has_random_id);
PARSE_FLAG(has_sender);
PARSE_FLAG(has_sender_name);
PARSE_FLAG(has_arg);
END_PARSE_FLAGS();
td::parse(dialog_id_, parser);
if (has_message_id) {
td::parse(message_id_, parser);
}
if (has_random_id) {
td::parse(random_id_, parser);
} else {
random_id_ = 0;
}
if (has_sender) {
td::parse(sender_user_id_, parser);
}
if (has_sender_name) {
td::parse(sender_name_, parser);
}
td::parse(date_, parser);
td::parse(loc_key_, parser);
if (has_arg) {
td::parse(arg_, parser);
}
td::parse(notification_id_, parser);
}
};
Status NotificationManager::process_message_push_notification(DialogId dialog_id, MessageId message_id, int64 random_id, Status NotificationManager::process_message_push_notification(DialogId dialog_id, MessageId message_id, int64 random_id,
UserId sender_user_id, string sender_name, int32 date, UserId sender_user_id, string sender_name, int32 date,
bool contains_mention, bool is_silent, string loc_key, bool contains_mention, bool is_silent, string loc_key,
string arg) { string arg, NotificationId notification_id,
int64 logevent_id) {
auto is_pinned = begins_with(loc_key, "PINNED_"); auto is_pinned = begins_with(loc_key, "PINNED_");
auto r_info = td_->messages_manager_->get_message_push_notification_info( auto r_info = td_->messages_manager_->get_message_push_notification_info(
dialog_id, message_id, random_id, sender_user_id, date, contains_mention, is_pinned); dialog_id, message_id, random_id, sender_user_id, date, contains_mention, is_pinned);
if (r_info.is_error()) { if (r_info.is_error()) {
VLOG(notifications) << "Don't need message push notification for " << message_id << "/" << random_id << " from " VLOG(notifications) << "Don't need message push notification for " << message_id << "/" << random_id << " from "
<< dialog_id << ": " << r_info.error(); << dialog_id << ": " << r_info.error();
if (logevent_id != 0) {
binlog_erase(G()->td_db()->get_binlog(), logevent_id);
}
return Status::OK(); return Status::OK();
} }
@ -2958,21 +3056,26 @@ Status NotificationManager::process_message_push_notification(DialogId dialog_id
CHECK(info.group_id.is_valid()); CHECK(info.group_id.is_valid());
if (dialog_id.get_type() == DialogType::SecretChat) { if (dialog_id.get_type() == DialogType::SecretChat) {
VLOG(notifications) << "Skep notification in secret " << dialog_id; VLOG(notifications) << "Skip notification in secret " << dialog_id;
// TODO support secret chat notifications // TODO support secret chat notifications
// main problem: there is no message_id yet // main problem: there is no message_id yet
CHECK(logevent_id == 0);
return Status::OK(); return Status::OK();
} }
CHECK(random_id == 0); CHECK(random_id == 0);
if (is_disabled() || max_notification_group_count_ == 0) { if (is_disabled() || max_notification_group_count_ == 0) {
CHECK(logevent_id == 0);
return Status::OK(); return Status::OK();
} }
auto notification_id = get_next_notification_id(); if (!notification_id.is_valid()) {
CHECK(logevent_id == 0);
notification_id = get_next_notification_id();
if (!notification_id.is_valid()) { if (!notification_id.is_valid()) {
return Status::OK(); return Status::OK();
} }
}
if (sender_user_id.is_valid() && !td_->contacts_manager_->have_user(sender_user_id)) { if (sender_user_id.is_valid() && !td_->contacts_manager_->have_user(sender_user_id)) {
int32 flags = telegram_api::user::FIRST_NAME_MASK | telegram_api::user::MIN_MASK; int32 flags = telegram_api::user::FIRST_NAME_MASK | telegram_api::user::MIN_MASK;
@ -2984,6 +3087,18 @@ Status NotificationManager::process_message_push_notification(DialogId dialog_id
td_->contacts_manager_->on_get_user(std::move(user), "process_message_push_notification"); td_->contacts_manager_->on_get_user(std::move(user), "process_message_push_notification");
} }
if (logevent_id == 0 && G()->parameters().use_message_db) {
AddMessagePushNotificationLogEvent logevent{dialog_id, message_id, random_id, sender_user_id,
sender_name, date, contains_mention, is_silent,
loc_key, arg, notification_id};
auto storer = LogEventStorerImpl<AddMessagePushNotificationLogEvent>(logevent);
logevent_id = binlog_add(G()->td_db()->get_binlog(), LogEvent::HandlerType::AddMessagePushNotification, storer);
}
if (logevent_id != 0) {
// TODO register logevent
}
auto group_id = info.group_id; auto group_id = info.group_id;
auto group_type = info.group_type; auto group_type = info.group_type;
auto settings_dialog_id = info.settings_dialog_id; auto settings_dialog_id = info.settings_dialog_id;
@ -3226,4 +3341,52 @@ void NotificationManager::on_pending_notification_update_count_changed(int32 dif
} }
} }
void NotificationManager::try_send_update_active_notifications() const {
if (max_notification_group_count_ == 0) {
return;
}
if (!is_binlog_processed_ || !is_inited_) {
return;
}
auto update = get_update_active_notifications();
VLOG(notifications) << "Send " << as_active_notifications_update(update.get());
send_closure(G()->td(), &Td::send_update, std::move(update));
}
void NotificationManager::on_binlog_events(vector<BinlogEvent> &&events) {
VLOG(notifications) << "Begin to process " << events.size() << " binlog events";
for (auto &event : events) {
switch (event.type_) {
case LogEvent::HandlerType::AddMessagePushNotification: {
if (!G()->parameters().use_message_db || is_disabled() || max_notification_group_count_ == 0) {
binlog_erase(G()->td_db()->get_binlog(), event.id_);
break;
}
CHECK(is_inited_);
AddMessagePushNotificationLogEvent log_event;
log_event_parse(log_event, event.data_).ensure();
auto status = process_message_push_notification(
log_event.dialog_id_, log_event.message_id_, log_event.random_id_, log_event.sender_user_id_,
log_event.sender_name_, log_event.date_, log_event.contains_mention_, log_event.is_silent_,
log_event.loc_key_, log_event.arg_, log_event.notification_id_, event.id_);
if (status.is_error()) {
LOG(ERROR) << "Receive error " << status << ", while processing message push notification";
}
break;
}
default:
LOG(FATAL) << "Unsupported logevent type " << event.type_;
}
}
if (is_inited_) {
flush_all_pending_notifications();
}
is_binlog_processed_ = true;
try_send_update_active_notifications();
VLOG(notifications) << "Finish processing binlog events";
}
} // namespace td } // namespace td

View File

@ -36,6 +36,8 @@ namespace td {
extern int VERBOSITY_NAME(notifications); extern int VERBOSITY_NAME(notifications);
struct BinlogEvent;
class Td; class Td;
class NotificationManager : public Actor { class NotificationManager : public Actor {
@ -119,6 +121,8 @@ class NotificationManager : public Actor {
void destroy_all_notifications(); void destroy_all_notifications();
void on_binlog_events(vector<BinlogEvent> &&events);
private: private:
static constexpr int32 DEFAULT_GROUP_COUNT_MAX = 0; static constexpr int32 DEFAULT_GROUP_COUNT_MAX = 0;
static constexpr int32 DEFAULT_GROUP_SIZE_MAX = 10; static constexpr int32 DEFAULT_GROUP_SIZE_MAX = 10;
@ -138,6 +142,8 @@ class NotificationManager : public Actor {
static constexpr int32 ANNOUNCEMENT_ID_CACHE_TIME = 7 * 86400; static constexpr int32 ANNOUNCEMENT_ID_CACHE_TIME = 7 * 86400;
class AddMessagePushNotificationLogEvent;
struct PendingNotification { struct PendingNotification {
int32 date = 0; int32 date = 0;
DialogId settings_dialog_id; DialogId settings_dialog_id;
@ -227,6 +233,8 @@ class NotificationManager : public Actor {
NotificationGroupKey get_last_updated_group_key() const; NotificationGroupKey get_last_updated_group_key() const;
void try_send_update_active_notifications() const;
td_api::object_ptr<td_api::updateActiveNotifications> get_update_active_notifications() const; td_api::object_ptr<td_api::updateActiveNotifications> get_update_active_notifications() const;
td_api::object_ptr<td_api::updateNotificationGroup> get_remove_group_update( td_api::object_ptr<td_api::updateNotificationGroup> get_remove_group_update(
@ -272,7 +280,8 @@ class NotificationManager : public Actor {
Status process_message_push_notification(DialogId dialog_id, MessageId message_id, int64 random_id, Status process_message_push_notification(DialogId dialog_id, MessageId message_id, int64 random_id,
UserId sender_user_id, string sender_name, int32 date, bool contains_mention, UserId sender_user_id, string sender_name, int32 date, bool contains_mention,
bool is_silent, string loc_key, string arg); bool is_silent, string loc_key, string arg, NotificationId notification_id,
int64 logevent_id);
void after_get_difference_impl(); void after_get_difference_impl();
@ -310,6 +319,9 @@ class NotificationManager : public Actor {
bool is_destroyed_ = false; bool is_destroyed_ = false;
bool is_inited_ = false;
bool is_binlog_processed_ = false;
bool running_get_difference_ = false; bool running_get_difference_ = false;
std::unordered_set<int32> running_get_chat_difference_; std::unordered_set<int32> running_get_chat_difference_;

View File

@ -4286,6 +4286,9 @@ Status Td::init(DbKey key) {
send_closure_later(messages_manager_actor_, &MessagesManager::on_binlog_events, send_closure_later(messages_manager_actor_, &MessagesManager::on_binlog_events,
std::move(events.to_messages_manager)); std::move(events.to_messages_manager));
send_closure_later(notification_manager_actor_, &NotificationManager::on_binlog_events,
std::move(events.to_notification_manager));
// NB: be very careful. This notification may be received before all binlog events are. // NB: be very careful. This notification may be received before all binlog events are.
G()->on_binlog_replay_finish(); G()->on_binlog_replay_finish();
send_closure(secret_chats_manager_, &SecretChatsManager::binlog_replay_finish); send_closure(secret_chats_manager_, &SecretChatsManager::binlog_replay_finish);

View File

@ -104,6 +104,9 @@ Status init_binlog(Binlog &binlog, string path, BinlogKeyValue<Binlog> &binlog_p
case LogEvent::HandlerType::ToggleDialogIsMarkedAsUnreadOnServer: case LogEvent::HandlerType::ToggleDialogIsMarkedAsUnreadOnServer:
events.to_messages_manager.push_back(event.clone()); events.to_messages_manager.push_back(event.clone());
break; break;
case LogEvent::HandlerType::AddMessagePushNotification:
events.to_notification_manager.push_back(event.clone());
break;
case LogEvent::HandlerType::BinlogPmcMagic: case LogEvent::HandlerType::BinlogPmcMagic:
binlog_pmc.external_init_handle(event); binlog_pmc.external_init_handle(event);
break; break;

View File

@ -57,6 +57,7 @@ class TdDb {
vector<BinlogEvent> web_page_events; vector<BinlogEvent> web_page_events;
vector<BinlogEvent> to_poll_manager; vector<BinlogEvent> to_poll_manager;
vector<BinlogEvent> to_messages_manager; vector<BinlogEvent> to_messages_manager;
vector<BinlogEvent> to_notification_manager;
}; };
static Result<unique_ptr<TdDb>> open(int32 scheduler_id, const TdParameters &parameters, DbKey key, Events &events); static Result<unique_ptr<TdDb>> open(int32 scheduler_id, const TdParameters &parameters, DbKey key, Events &events);

View File

@ -94,6 +94,7 @@ class LogEvent {
ReadHistoryInSecretChat = 0x114, ReadHistoryInSecretChat = 0x114,
ToggleDialogIsMarkedAsUnreadOnServer = 0x115, ToggleDialogIsMarkedAsUnreadOnServer = 0x115,
GetChannelDifference = 0x140, GetChannelDifference = 0x140,
AddMessagePushNotification = 0x200,
ConfigPmcMagic = 0x1f18, ConfigPmcMagic = 0x1f18,
BinlogPmcMagic = 0x4327 BinlogPmcMagic = 0x4327
}; };