Send result to processPushNotification only after the updateNotificationGroup with the notification is sent.

GitOrigin-RevId: d7b814c0ae6ea0555444d9ec54a570e440889b67
This commit is contained in:
levlam 2019-04-02 22:24:54 +03:00
parent 1d4b1570ad
commit b5802e435f
5 changed files with 73 additions and 36 deletions

View File

@ -4899,7 +4899,7 @@ MessagesManager::Dialog *MessagesManager::get_service_notifications_dialog() {
}
void MessagesManager::on_update_service_notification(tl_object_ptr<telegram_api::updateServiceNotification> &&update,
bool skip_new_entities) {
bool skip_new_entities, Promise<Unit> &&promise) {
int32 ttl = 0;
bool has_date = (update->flags_ & telegram_api::updateServiceNotification::INBOX_DATE_MASK) != 0;
auto date = has_date ? update->inbox_date_ : G()->unix_time();
@ -4944,6 +4944,7 @@ void MessagesManager::on_update_service_notification(tl_object_ptr<telegram_api:
send_update_chat_last_message(d, "on_update_service_notification");
}
}
promise.set_value(Unit());
}
void MessagesManager::on_update_new_channel_message(tl_object_ptr<telegram_api::updateNewChannelMessage> &&update) {

View File

@ -282,7 +282,7 @@ class MessagesManager : public Actor {
void on_update_dialog_pinned_message_id(DialogId dialog_id, MessageId pinned_message_id);
void on_update_service_notification(tl_object_ptr<telegram_api::updateServiceNotification> &&update,
bool skip_new_entities);
bool skip_new_entities, Promise<Unit> &&promise);
void on_update_new_channel_message(tl_object_ptr<telegram_api::updateNewChannelMessage> &&update);

View File

@ -1317,6 +1317,17 @@ void NotificationManager::flush_pending_updates(int32 group_id, const char *sour
send_closure(G()->td(), &Td::send_update, std::move(update));
}
on_delayed_notification_update_count_changed(-1, group_id, "flush_pending_updates");
auto group_it = get_group_force(NotificationGroupId(group_id));
CHECK(group_it != groups_.end());
NotificationGroup &group = group_it->second;
for (auto &notification : group.notifications) {
auto promise_it = push_notification_promises_.find(notification.notification_id);
if (promise_it != push_notification_promises_.end()) {
promise_it->second.set_value(Unit());
push_notification_promises_.erase(promise_it);
}
}
}
void NotificationManager::force_flush_pending_updates(NotificationGroupId group_id, const char *source) {
@ -1610,6 +1621,12 @@ void NotificationManager::edit_notification(NotificationGroupId group_id, Notifi
void NotificationManager::on_notification_removed(NotificationId notification_id) {
VLOG(notifications) << "In on_notification_removed with " << notification_id;
auto promise_it = push_notification_promises_.find(notification_id);
if (promise_it != push_notification_promises_.end()) {
promise_it->second.set_value(Unit());
push_notification_promises_.erase(promise_it);
}
auto it = temporary_notification_logevent_ids_.find(notification_id);
if (it == temporary_notification_logevent_ids_.end()) {
return;
@ -2578,7 +2595,7 @@ void NotificationManager::process_push_notification(string payload, Promise<Unit
}
if (receiver_id == 0 || receiver_id == G()->get_my_id()) {
auto status = process_push_notification_payload(payload);
auto status = process_push_notification_payload(payload, promise);
if (status.is_error()) {
if (status.code() == 406) {
return promise.set_error(std::move(status));
@ -2587,7 +2604,7 @@ void NotificationManager::process_push_notification(string payload, Promise<Unit
LOG(ERROR) << "Receive error " << status << ", while parsing push payload " << payload;
return promise.set_error(Status::Error(400, status.message()));
}
promise.set_value(Unit());
// promise will be set after updateNotificationGroup is sent to the client
return;
}
@ -2780,7 +2797,7 @@ string NotificationManager::convert_loc_key(const string &loc_key) {
return string();
}
Status NotificationManager::process_push_notification_payload(string payload) {
Status NotificationManager::process_push_notification_payload(string payload, Promise<Unit> &promise) {
VLOG(notifications) << "Process push notification payload " << payload;
auto r_json_value = json_decode(payload);
if (r_json_value.is_error()) {
@ -2857,6 +2874,7 @@ Status NotificationManager::process_push_notification_payload(string payload) {
auto now = G()->unix_time();
if (date >= now - ANNOUNCEMENT_ID_CACHE_TIME) {
VLOG(notifications) << "Ignore duplicate announcement " << announcement_id;
promise.set_value(Unit());
return Status::OK();
}
date = now;
@ -2864,7 +2882,8 @@ Status NotificationManager::process_push_notification_payload(string payload) {
auto update = telegram_api::make_object<telegram_api::updateServiceNotification>(
telegram_api::updateServiceNotification::INBOX_DATE_MASK, false, G()->unix_time(), string(),
announcement_message_text, nullptr, vector<telegram_api::object_ptr<telegram_api::MessageEntity>>());
send_closure(G()->messages_manager(), &MessagesManager::on_update_service_notification, std::move(update), false);
send_closure(G()->messages_manager(), &MessagesManager::on_update_service_notification, std::move(update), false,
std::move(promise));
save_announcement_ids();
return Status::OK();
}
@ -2882,16 +2901,18 @@ Status NotificationManager::process_push_notification_payload(string payload) {
return Status::Error(PSLICE() << "Receive invalid addr " << format::escaped(addr));
}
send_closure(G()->connection_creator(), &ConnectionCreator::on_dc_update, DcId::internal(dc_id), std::move(addr),
Promise<Unit>());
std::move(promise));
return Status::OK();
}
if (loc_key == "LOCKED_MESSAGE") {
promise.set_value(Unit());
return Status::OK();
}
if (loc_key == "AUTH_REGION" || loc_key == "AUTH_UNKNOWN") {
// TODO
promise.set_value(Unit());
return Status::OK();
}
@ -2944,8 +2965,9 @@ Status NotificationManager::process_push_notification_payload(string payload) {
return Status::Error("Receive invalid max_id");
}
send_closure(G()->messages_manager(), &MessagesManager::read_history_inbox, dialog_id,
MessageId(max_server_message_id), -1, "process_push_notification_payload");
td_->messages_manager_->read_history_inbox(dialog_id, MessageId(max_server_message_id), -1,
"process_push_notification_payload");
promise.set_value(Unit());
return Status::OK();
}
@ -3043,9 +3065,10 @@ Status NotificationManager::process_push_notification_payload(string payload) {
arg = std::move(loc_args[0]);
}
return process_message_push_notification(dialog_id, MessageId(server_message_id), random_id, sender_user_id,
std::move(sender_name), sent_date, contains_mention, is_silent,
std::move(loc_key), std::move(arg), NotificationId(), 0);
process_message_push_notification(dialog_id, MessageId(server_message_id), random_id, sender_user_id,
std::move(sender_name), sent_date, contains_mention, is_silent, std::move(loc_key),
std::move(arg), NotificationId(), 0, std::move(promise));
return Status::OK();
}
class NotificationManager::AddMessagePushNotificationLogEvent {
@ -3139,11 +3162,11 @@ class NotificationManager::AddMessagePushNotificationLogEvent {
}
};
Status NotificationManager::process_message_push_notification(DialogId dialog_id, MessageId message_id, int64 random_id,
UserId sender_user_id, string sender_name, int32 date,
bool contains_mention, bool is_silent, string loc_key,
string arg, NotificationId notification_id,
uint64 logevent_id) {
void NotificationManager::process_message_push_notification(DialogId dialog_id, MessageId message_id, int64 random_id,
UserId sender_user_id, string sender_name, int32 date,
bool contains_mention, bool is_silent, string loc_key,
string arg, NotificationId notification_id,
uint64 logevent_id, Promise<Unit> promise) {
auto is_pinned = begins_with(loc_key, "PINNED_");
auto r_info = td_->messages_manager_->get_message_push_notification_info(
dialog_id, message_id, random_id, sender_user_id, date, contains_mention, is_pinned, logevent_id != 0);
@ -3154,9 +3177,11 @@ Status NotificationManager::process_message_push_notification(DialogId dialog_id
binlog_erase(G()->td_db()->get_binlog(), logevent_id);
}
if (r_info.error().code() == 406) {
return r_info.move_as_error();
promise.set_error(r_info.move_as_error());
} else {
promise.set_value(Unit());
}
return Status::OK();
return;
}
auto info = r_info.move_as_ok();
@ -3168,20 +3193,20 @@ Status NotificationManager::process_message_push_notification(DialogId dialog_id
// main problem: there is no message_id yet
// also don't forget to delete newSecretChat notification
CHECK(logevent_id == 0);
return Status::Error(406, "Secret chat push notifications are unsupported");
return promise.set_error(Status::Error(406, "Secret chat push notifications are unsupported"));
}
CHECK(random_id == 0);
if (is_disabled() || max_notification_group_count_ == 0) {
CHECK(logevent_id == 0);
return Status::OK();
return promise.set_value(Unit());
}
if (!notification_id.is_valid()) {
CHECK(logevent_id == 0);
notification_id = get_next_notification_id();
if (!notification_id.is_valid()) {
return Status::OK();
return promise.set_value(Unit());
}
}
@ -3207,6 +3232,7 @@ Status NotificationManager::process_message_push_notification(DialogId dialog_id
VLOG(notifications) << "Register temporary " << notification_id << " with logevent " << logevent_id;
temporary_notification_logevent_ids_[notification_id] = logevent_id;
}
push_notification_promises_[notification_id] = std::move(promise);
auto group_id = info.group_id;
CHECK(group_id.is_valid());
@ -3220,7 +3246,6 @@ Status NotificationManager::process_message_push_notification(DialogId dialog_id
add_notification(
group_id, group_type, dialog_id, date, settings_dialog_id, is_silent, 0, notification_id,
create_new_push_message_notification(sender_user_id, message_id, std::move(loc_key), std::move(arg)));
return Status::OK();
}
Result<int64> NotificationManager::get_push_receiver_id(string payload) {
@ -3450,6 +3475,10 @@ void NotificationManager::destroy_all_notifications() {
on_unreceived_notification_update_count_changed(-unreceived_notification_update_count_, 0,
"destroy_all_notifications");
}
for (auto &it : push_notification_promises_) {
it.second.set_value(Unit());
}
push_notification_promises_.clear();
is_destroyed_ = true;
}
@ -3497,7 +3526,7 @@ void NotificationManager::on_unreceived_notification_update_count_changed(int32
}
}
void NotificationManager::try_send_update_active_notifications() const {
void NotificationManager::try_send_update_active_notifications() {
if (max_notification_group_count_ == 0) {
return;
}
@ -3508,6 +3537,11 @@ void NotificationManager::try_send_update_active_notifications() const {
auto update = get_update_active_notifications();
VLOG(notifications) << "Send " << as_active_notifications_update(update.get());
send_closure(G()->td(), &Td::send_update, std::move(update));
for (auto &it : push_notification_promises_) {
it.second.set_value(Unit());
}
push_notification_promises_.clear();
}
void NotificationManager::on_binlog_events(vector<BinlogEvent> &&events) {
@ -3524,13 +3558,14 @@ void NotificationManager::on_binlog_events(vector<BinlogEvent> &&events) {
AddMessagePushNotificationLogEvent log_event;
log_event_parse(log_event, event.data_).ensure();
auto status = process_message_push_notification(
process_message_push_notification(
log_event.dialog_id_, log_event.message_id_, log_event.random_id_, log_event.sender_user_id_,
log_event.sender_name_, log_event.date_, log_event.contains_mention_, true, log_event.loc_key_,
log_event.arg_, log_event.notification_id_, event.id_);
if (status.is_error()) {
LOG(ERROR) << "Receive error " << status << ", while processing message push notification";
}
log_event.arg_, log_event.notification_id_, event.id_, PromiseCreator::lambda([](Result<Unit> result) {
if (result.is_error()) {
LOG(ERROR) << "Receive error " << result.error() << ", while processing message push notification";
}
}));
break;
}
default:

View File

@ -238,7 +238,7 @@ class NotificationManager : public Actor {
NotificationGroupKey get_last_updated_group_key() const;
void try_send_update_active_notifications() const;
void try_send_update_active_notifications();
void send_update_have_pending_notifications() const;
@ -287,12 +287,12 @@ class NotificationManager : public Actor {
static string convert_loc_key(const string &loc_key);
Status process_push_notification_payload(string payload);
Status process_push_notification_payload(string payload, Promise<Unit> &promise);
Status process_message_push_notification(DialogId dialog_id, MessageId message_id, int64 random_id,
UserId sender_user_id, string sender_name, int32 date, bool contains_mention,
bool is_silent, string loc_key, string arg, NotificationId notification_id,
uint64 logevent_id);
void process_message_push_notification(DialogId dialog_id, MessageId message_id, int64 random_id,
UserId sender_user_id, string sender_name, int32 date, bool contains_mention,
bool is_silent, string loc_key, string arg, NotificationId notification_id,
uint64 logevent_id, Promise<Unit> promise);
void after_get_difference_impl();
@ -352,6 +352,7 @@ class NotificationManager : public Actor {
std::unordered_map<DialogId, NotificationGroupId, DialogIdHash> dialog_id_to_call_notification_group_id_;
std::unordered_map<NotificationId, uint64, NotificationIdHash> temporary_notification_logevent_ids_;
std::unordered_map<NotificationId, Promise<Unit>, NotificationIdHash> push_notification_promises_;
struct ActiveCallNotification {
CallId call_id;

View File

@ -1458,7 +1458,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryOutb
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateServiceNotification> update, bool /*force_apply*/) {
CHECK(update != nullptr);
td_->messages_manager_->on_update_service_notification(std::move(update), true);
td_->messages_manager_->on_update_service_notification(std::move(update), true, Promise<Unit>());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadChannelInbox> update, bool /*force_apply*/) {