Combine and delay addPaidMessageReaction queries.

This commit is contained in:
levlam 2024-08-04 12:26:40 +03:00
parent 7d93a305ad
commit 4dc42bc1bd
5 changed files with 127 additions and 24 deletions

View File

@ -701,6 +701,7 @@ void MessageReactions::update_from(const MessageReactions &old_reactions, Dialog
} }
} }
} }
pending_paid_reactions_ = old_reactions.pending_paid_reactions_;
} }
bool MessageReactions::add_my_reaction(const ReactionType &reaction_type, bool is_big, DialogId my_dialog_id, bool MessageReactions::add_my_reaction(const ReactionType &reaction_type, bool is_big, DialogId my_dialog_id,
@ -797,12 +798,11 @@ bool MessageReactions::do_remove_my_reaction(const ReactionType &reaction_type)
} }
void MessageReactions::add_my_paid_reaction(int32 star_count) { void MessageReactions::add_my_paid_reaction(int32 star_count) {
auto added_reaction = get_reaction(ReactionType::paid()); if (pending_paid_reactions_ > 1000000000 || star_count > 1000000000) {
if (added_reaction == nullptr) { LOG(ERROR) << "Pending paid reactions overflown";
reactions_.push_back({ReactionType::paid(), star_count, true, DialogId(), Auto(), Auto()}); return;
} else {
added_reaction->add_paid_reaction(star_count);
} }
pending_paid_reactions_ += star_count;
} }
void MessageReactions::sort_reactions(const FlatHashMap<ReactionType, size_t, ReactionTypeHash> &active_reaction_pos) { void MessageReactions::sort_reactions(const FlatHashMap<ReactionType, size_t, ReactionTypeHash> &active_reaction_pos) {
@ -907,6 +907,23 @@ bool MessageReactions::are_consistent_with_list(
} }
} }
vector<MessageReactor> MessageReactions::apply_reactor_pending_paid_reactions(DialogId my_dialog_id) const {
vector<MessageReactor> top_reactors;
bool was_me = false;
for (auto &reactor : top_reactors_) {
top_reactors.push_back(reactor);
if (reactor.is_me()) {
was_me = true;
top_reactors.back().add_count(pending_paid_reactions_);
}
}
if (!was_me) {
top_reactors.emplace_back(my_dialog_id, pending_paid_reactions_);
}
MessageReactor::fix_message_reactors(top_reactors, false);
return top_reactors;
}
td_api::object_ptr<td_api::messageReactions> MessageReactions::get_message_reactions_object(Td *td, UserId my_user_id, td_api::object_ptr<td_api::messageReactions> MessageReactions::get_message_reactions_object(Td *td, UserId my_user_id,
UserId peer_user_id) const { UserId peer_user_id) const {
auto reactions = transform(reactions_, [td, my_user_id, peer_user_id](const MessageReaction &reaction) { auto reactions = transform(reactions_, [td, my_user_id, peer_user_id](const MessageReaction &reaction) {
@ -914,6 +931,20 @@ td_api::object_ptr<td_api::messageReactions> MessageReactions::get_message_react
}); });
auto reactors = auto reactors =
transform(top_reactors_, [td](const MessageReactor &reactor) { return reactor.get_paid_reactor_object(td); }); transform(top_reactors_, [td](const MessageReactor &reactor) { return reactor.get_paid_reactor_object(td); });
if (pending_paid_reactions_ > 0) {
if (reactions_.empty() || !reactions_[0].reaction_type_.is_paid_reaction()) {
reactions.insert(reactions.begin(),
MessageReaction(ReactionType::paid(), pending_paid_reactions_, true, DialogId(), Auto(), Auto())
.get_message_reaction_object(td, my_user_id, peer_user_id));
} else {
reactions[0]->total_count_ += pending_paid_reactions_;
reactions[0]->is_chosen_ = true;
}
auto top_reactors = apply_reactor_pending_paid_reactions(DialogId(my_user_id));
reactors =
transform(top_reactors, [td](const MessageReactor &reactor) { return reactor.get_paid_reactor_object(td); });
}
return td_api::make_object<td_api::messageReactions>(std::move(reactions), are_tags_, std::move(reactors)); return td_api::make_object<td_api::messageReactions>(std::move(reactions), are_tags_, std::move(reactors));
} }
@ -965,6 +996,24 @@ bool MessageReactions::need_update_unread_reactions(const MessageReactions *old_
return new_reactions == nullptr || old_reactions->unread_reactions_ != new_reactions->unread_reactions_; return new_reactions == nullptr || old_reactions->unread_reactions_ != new_reactions->unread_reactions_;
} }
void MessageReactions::send_paid_message_reaction(Td *td, MessageFullId message_full_id, int64 random_id,
Promise<Unit> &&promise) {
if (pending_paid_reactions_ == 0) {
return promise.set_value(Unit());
}
auto star_count = pending_paid_reactions_;
top_reactors_ = apply_reactor_pending_paid_reactions(td->dialog_manager_->get_my_dialog_id());
if (reactions_.empty() || !reactions_[0].reaction_type_.is_paid_reaction()) {
reactions_.insert(reactions_.begin(),
MessageReaction(ReactionType::paid(), star_count, true, DialogId(), Auto(), Auto()));
} else {
reactions_[0].add_paid_reaction(star_count);
}
pending_paid_reactions_ = 0;
td->create_handler<SendPaidReactionQuery>(std::move(promise))->send(message_full_id, star_count, random_id);
}
StringBuilder &operator<<(StringBuilder &string_builder, const MessageReactions &reactions) { StringBuilder &operator<<(StringBuilder &string_builder, const MessageReactions &reactions) {
if (reactions.are_tags_) { if (reactions.are_tags_) {
return string_builder << "MessageTags{" << reactions.reactions_ << '}'; return string_builder << "MessageTags{" << reactions.reactions_ << '}';
@ -973,7 +1022,8 @@ StringBuilder &operator<<(StringBuilder &string_builder, const MessageReactions
<< " with unread " << reactions.unread_reactions_ << ", reaction order " << " with unread " << reactions.unread_reactions_ << ", reaction order "
<< reactions.chosen_reaction_order_ << reactions.chosen_reaction_order_
<< " and can_get_added_reactions = " << reactions.can_get_added_reactions_ << " and can_get_added_reactions = " << reactions.can_get_added_reactions_
<< " with paid reactions by " << reactions.top_reactors_ << '}'; << " with paid reactions by " << reactions.top_reactors_ << " and "
<< reactions.pending_paid_reactions_ << " pending paid reactions}";
} }
StringBuilder &operator<<(StringBuilder &string_builder, const unique_ptr<MessageReactions> &reactions) { StringBuilder &operator<<(StringBuilder &string_builder, const unique_ptr<MessageReactions> &reactions) {
@ -1021,11 +1071,6 @@ void set_message_reactions(Td *td, MessageFullId message_full_id, vector<Reactio
send_message_reaction(td, message_full_id, std::move(reaction_types), is_big, false, std::move(promise)); send_message_reaction(td, message_full_id, std::move(reaction_types), is_big, false, std::move(promise));
} }
void send_paid_message_reaction(Td *td, MessageFullId message_full_id, int32 star_count, int64 random_id,
Promise<Unit> &&promise) {
td->create_handler<SendPaidReactionQuery>(std::move(promise))->send(message_full_id, star_count, random_id);
}
void get_message_added_reactions(Td *td, MessageFullId message_full_id, ReactionType reaction_type, string offset, void get_message_added_reactions(Td *td, MessageFullId message_full_id, ReactionType reaction_type, string offset,
int32 limit, Promise<td_api::object_ptr<td_api::addedReactions>> &&promise) { int32 limit, Promise<td_api::object_ptr<td_api::addedReactions>> &&promise) {
if (!td->messages_manager_->have_message_force(message_full_id, "get_message_added_reactions")) { if (!td->messages_manager_->have_message_force(message_full_id, "get_message_added_reactions")) {

View File

@ -157,6 +157,7 @@ struct MessageReactions {
vector<UnreadMessageReaction> unread_reactions_; vector<UnreadMessageReaction> unread_reactions_;
vector<ReactionType> chosen_reaction_order_; vector<ReactionType> chosen_reaction_order_;
vector<MessageReactor> top_reactors_; vector<MessageReactor> top_reactors_;
int32 pending_paid_reactions_ = 0;
bool is_min_ = false; bool is_min_ = false;
bool need_polling_ = true; bool need_polling_ = true;
bool can_get_added_reactions_ = false; bool can_get_added_reactions_ = false;
@ -205,6 +206,8 @@ struct MessageReactions {
static bool need_update_unread_reactions(const MessageReactions *old_reactions, static bool need_update_unread_reactions(const MessageReactions *old_reactions,
const MessageReactions *new_reactions); const MessageReactions *new_reactions);
void send_paid_message_reaction(Td *td, MessageFullId message_full_id, int64 random_id, Promise<Unit> &&promise);
template <class StorerT> template <class StorerT>
void store(StorerT &storer) const; void store(StorerT &storer) const;
@ -213,6 +216,8 @@ struct MessageReactions {
private: private:
bool do_remove_my_reaction(const ReactionType &reaction_type); bool do_remove_my_reaction(const ReactionType &reaction_type);
vector<MessageReactor> apply_reactor_pending_paid_reactions(DialogId my_dialog_id) const;
}; };
StringBuilder &operator<<(StringBuilder &string_builder, const MessageReactions &reactions); StringBuilder &operator<<(StringBuilder &string_builder, const MessageReactions &reactions);
@ -227,9 +232,6 @@ void send_message_reaction(Td *td, MessageFullId message_full_id, vector<Reactio
void set_message_reactions(Td *td, MessageFullId message_full_id, vector<ReactionType> reaction_types, bool is_big, void set_message_reactions(Td *td, MessageFullId message_full_id, vector<ReactionType> reaction_types, bool is_big,
Promise<Unit> &&promise); Promise<Unit> &&promise);
void send_paid_message_reaction(Td *td, MessageFullId message_full_id, int32 star_count, int64 random_id,
Promise<Unit> &&promise);
void get_message_added_reactions(Td *td, MessageFullId message_full_id, ReactionType reaction_type, string offset, void get_message_added_reactions(Td *td, MessageFullId message_full_id, ReactionType reaction_type, string offset,
int32 limit, Promise<td_api::object_ptr<td_api::addedReactions>> &&promise); int32 limit, Promise<td_api::object_ptr<td_api::addedReactions>> &&promise);

View File

@ -50,6 +50,10 @@ class MessageReactor {
bool fix_is_me(DialogId my_dialog_id); bool fix_is_me(DialogId my_dialog_id);
void add_count(int32 count) {
count_ += count;
}
td_api::object_ptr<td_api::paidReactor> get_paid_reactor_object(Td *td) const; td_api::object_ptr<td_api::paidReactor> get_paid_reactor_object(Td *td) const;
void add_dependencies(Dependencies &dependencies) const; void add_dependencies(Dependencies &dependencies) const;

View File

@ -5558,6 +5558,9 @@ MessagesManager::MessagesManager(Td *td, ActorShared<> parent)
send_update_chat_read_inbox_timeout_.set_callback(on_send_update_chat_read_inbox_timeout_callback); send_update_chat_read_inbox_timeout_.set_callback(on_send_update_chat_read_inbox_timeout_callback);
send_update_chat_read_inbox_timeout_.set_callback_data(static_cast<void *>(this)); send_update_chat_read_inbox_timeout_.set_callback_data(static_cast<void *>(this));
send_paid_reactions_timeout_.set_callback(on_send_paid_reactions_timeout_callback);
send_paid_reactions_timeout_.set_callback_data(static_cast<void *>(this));
} }
MessagesManager::~MessagesManager() { MessagesManager::~MessagesManager() {
@ -5701,6 +5704,16 @@ void MessagesManager::on_send_update_chat_read_inbox_timeout_callback(void *mess
&MessagesManager::on_send_update_chat_read_inbox_timeout, DialogId(dialog_id_int)); &MessagesManager::on_send_update_chat_read_inbox_timeout, DialogId(dialog_id_int));
} }
void MessagesManager::on_send_paid_reactions_timeout_callback(void *messages_manager_ptr, int64 task_id) {
if (G()->close_flag()) {
return;
}
auto messages_manager = static_cast<MessagesManager *>(messages_manager_ptr);
send_closure_later(messages_manager->actor_id(messages_manager), &MessagesManager::on_send_paid_reactions_timeout,
task_id);
}
BufferSlice MessagesManager::get_dialog_database_value(const Dialog *d) { BufferSlice MessagesManager::get_dialog_database_value(const Dialog *d) {
// can't use log_event_store, because it tries to parse stored Dialog // can't use log_event_store, because it tries to parse stored Dialog
LogEventStorerCalcLength storer_calc_length; LogEventStorerCalcLength storer_calc_length;
@ -12068,6 +12081,39 @@ void MessagesManager::on_send_update_chat_read_inbox_timeout(DialogId dialog_id)
} }
} }
void MessagesManager::on_send_paid_reactions_timeout(int64 task_id) {
if (G()->close_flag()) {
return;
}
auto it = paid_reaction_tasks_.find(task_id);
if (it == paid_reaction_tasks_.end()) {
return;
}
auto message_full_id = it->second;
paid_reaction_tasks_.erase(it);
bool is_erased = paid_reaction_task_ids_.erase(message_full_id) > 0;
CHECK(is_erased);
Dialog *d = get_dialog_force(message_full_id.get_dialog_id(), "on_send_paid_reactions_timeout");
CHECK(d != nullptr);
auto *m = get_message_force(d, message_full_id.get_message_id(), "on_send_paid_reactions_timeout");
if (m == nullptr || m->reactions == nullptr) {
return;
}
if (!get_message_available_reactions(d, m, true, nullptr).is_allowed_reaction_type(ReactionType::paid())) {
// TODO drop pending reactions
return;
}
pending_reactions_[message_full_id].query_count++;
int64 random_id = (static_cast<int64>(G()->unix_time()) << 32) | static_cast<int64>(Random::secure_uint32());
auto promise = PromiseCreator::lambda([actor_id = actor_id(this), message_full_id](Result<Unit> &&result) {
send_closure(actor_id, &MessagesManager::on_set_message_reactions, message_full_id, std::move(result), Auto());
});
m->reactions->send_paid_message_reaction(td_, message_full_id, random_id, std::move(promise));
}
int32 MessagesManager::get_message_date(const tl_object_ptr<telegram_api::Message> &message_ptr) { int32 MessagesManager::get_message_date(const tl_object_ptr<telegram_api::Message> &message_ptr) {
switch (message_ptr->get_id()) { switch (message_ptr->get_id()) {
case telegram_api::messageEmpty::ID: case telegram_api::messageEmpty::ID:
@ -22686,19 +22732,16 @@ void MessagesManager::add_paid_message_reaction(MessageFullId message_full_id, i
m->reactions->sort_reactions(active_reaction_pos_); m->reactions->sort_reactions(active_reaction_pos_);
LOG(INFO) << "Update message reactions to " << *m->reactions; LOG(INFO) << "Update message reactions to " << *m->reactions;
pending_reactions_[message_full_id].query_count++;
send_update_message_interaction_info(d->dialog_id, m); send_update_message_interaction_info(d->dialog_id, m);
on_message_changed(d, m, true, "add_paid_message_reaction"); on_message_changed(d, m, true, "add_paid_message_reaction");
// TODO log event auto &task_id = paid_reaction_task_ids_[message_full_id];
int64 random_id = (static_cast<int64>(G()->unix_time()) << 32) | static_cast<int64>(Random::secure_uint32()); if (task_id == 0) {
auto query_promise = PromiseCreator::lambda( task_id = ++paid_reaction_task_id_;
[actor_id = actor_id(this), message_full_id, promise = std::move(promise)](Result<Unit> &&result) mutable { paid_reaction_tasks_[task_id] = message_full_id;
send_closure(actor_id, &MessagesManager::on_set_message_reactions, message_full_id, std::move(result), }
std::move(promise)); send_paid_reactions_timeout_.set_timeout_in(task_id, 6.0);
}); promise.set_value(Unit());
send_paid_message_reaction(td_, message_full_id, narrow_cast<int32>(star_count), random_id, std::move(query_promise));
} }
void MessagesManager::remove_message_reaction(MessageFullId message_full_id, ReactionType reaction_type, void MessagesManager::remove_message_reaction(MessageFullId message_full_id, ReactionType reaction_type,

View File

@ -2121,6 +2121,8 @@ class MessagesManager final : public Actor {
void on_send_update_chat_read_inbox_timeout(DialogId dialog_id); void on_send_update_chat_read_inbox_timeout(DialogId dialog_id);
void on_send_paid_reactions_timeout(int64 task_id);
bool delete_newer_server_messages_at_the_end(Dialog *d, MessageId max_message_id); bool delete_newer_server_messages_at_the_end(Dialog *d, MessageId max_message_id);
template <class T, class It> template <class T, class It>
@ -3049,6 +3051,8 @@ class MessagesManager final : public Actor {
static void on_send_update_chat_read_inbox_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int); static void on_send_update_chat_read_inbox_timeout_callback(void *messages_manager_ptr, int64 dialog_id_int);
static void on_send_paid_reactions_timeout_callback(void *messages_manager_ptr, int64 task_id);
void load_secret_thumbnail(FileId thumbnail_file_id); void load_secret_thumbnail(FileId thumbnail_file_id);
void on_upload_media(FileId file_id, tl_object_ptr<telegram_api::InputFile> input_file, void on_upload_media(FileId file_id, tl_object_ptr<telegram_api::InputFile> input_file,
@ -3380,6 +3384,7 @@ class MessagesManager final : public Actor {
MultiTimeout preload_folder_dialog_list_timeout_{"PreloadFolderDialogListTimeout"}; MultiTimeout preload_folder_dialog_list_timeout_{"PreloadFolderDialogListTimeout"};
MultiTimeout update_viewed_messages_timeout_{"UpdateViewedMessagesTimeout"}; MultiTimeout update_viewed_messages_timeout_{"UpdateViewedMessagesTimeout"};
MultiTimeout send_update_chat_read_inbox_timeout_{"SendUpdateChatReadInboxTimeout"}; MultiTimeout send_update_chat_read_inbox_timeout_{"SendUpdateChatReadInboxTimeout"};
MultiTimeout send_paid_reactions_timeout_{"SendPaidReactionsTimeout"};
Hints dialogs_hints_; // search dialogs by title and usernames Hints dialogs_hints_; // search dialogs by title and usernames
@ -3514,6 +3519,10 @@ class MessagesManager final : public Actor {
}; };
FlatHashMap<MessageFullId, PendingReaction, MessageFullIdHash> pending_reactions_; FlatHashMap<MessageFullId, PendingReaction, MessageFullIdHash> pending_reactions_;
int64 paid_reaction_task_id_ = 0;
FlatHashMap<MessageFullId, int64, MessageFullIdHash> paid_reaction_task_ids_;
FlatHashMap<int64, MessageFullId> paid_reaction_tasks_;
FlatHashMap<MessageFullId, int32, MessageFullIdHash> pending_read_reactions_; FlatHashMap<MessageFullId, int32, MessageFullIdHash> pending_read_reactions_;
vector<ReactionType> active_reaction_types_; vector<ReactionType> active_reaction_types_;