Fail sending of messages from yet_unsent_media_queues_ on restart.

This commit is contained in:
levlam 2023-05-23 17:01:51 +03:00
parent d01f1ab20a
commit 54e967861c
2 changed files with 28 additions and 9 deletions

View File

@ -13547,6 +13547,17 @@ void MessagesManager::hangup() {
fail_send_message(full_message_id, Global::request_aborted_error()); fail_send_message(full_message_id, Global::request_aborted_error());
} }
} }
while (!yet_unsent_media_queues_.empty()) {
auto it = yet_unsent_media_queues_.begin();
auto queue = std::move(it->second);
yet_unsent_media_queues_.erase(it);
for (auto &promise_it : queue.queue_) {
auto message_id = promise_it.first;
if (message_id.is_yet_unsent()) {
fail_send_message({queue.dialog_id_, message_id}, Global::request_aborted_error());
}
}
}
while (!being_sent_messages_.empty()) { while (!being_sent_messages_.empty()) {
on_send_message_fail(being_sent_messages_.begin()->first, Global::request_aborted_error()); on_send_message_fail(being_sent_messages_.begin()->first, Global::request_aborted_error());
} }
@ -24673,7 +24684,7 @@ void MessagesManager::cancel_send_message_query(DialogId dialog_id, Message *m)
if (queue_id & 1) { if (queue_id & 1) {
auto queue_it = yet_unsent_media_queues_.find(queue_id); auto queue_it = yet_unsent_media_queues_.find(queue_id);
if (queue_it != yet_unsent_media_queues_.end()) { if (queue_it != yet_unsent_media_queues_.end()) {
auto &queue = queue_it->second; auto &queue = queue_it->second.queue_;
LOG(INFO) << "Delete " << m->message_id << " from queue " << queue_id; LOG(INFO) << "Delete " << m->message_id << " from queue " << queue_id;
if (queue.erase(m->message_id) != 0) { if (queue.erase(m->message_id) != 0) {
if (queue.empty()) { if (queue.empty()) {
@ -25827,9 +25838,10 @@ void MessagesManager::on_media_message_ready_to_send(DialogId dialog_id, Message
auto queue_id = ChainId(dialog_id, MessageContentType::Photo).get(); auto queue_id = ChainId(dialog_id, MessageContentType::Photo).get();
CHECK(queue_id & 1); CHECK(queue_id & 1);
auto &queue = yet_unsent_media_queues_[queue_id]; auto &queue = yet_unsent_media_queues_[queue_id];
auto it = queue.find(message_id); queue.dialog_id_ = dialog_id;
if (it == queue.end()) { auto it = queue.queue_.find(message_id);
if (queue.empty()) { if (it == queue.queue_.end()) {
if (queue.queue_.empty()) {
yet_unsent_media_queues_.erase(queue_id); yet_unsent_media_queues_.erase(queue_id);
} }
@ -25857,7 +25869,7 @@ void MessagesManager::on_yet_unsent_media_queue_updated(DialogId dialog_id) {
if (it == yet_unsent_media_queues_.end()) { if (it == yet_unsent_media_queues_.end()) {
return; return;
} }
auto &queue = it->second; auto &queue = it->second.queue_;
if (queue.empty()) { if (queue.empty()) {
yet_unsent_media_queues_.erase(it); yet_unsent_media_queues_.erase(it);
return; return;
@ -32964,8 +32976,9 @@ void MessagesManager::on_send_dialog_action_timeout(DialogId dialog_id) {
pending_send_dialog_action_timeout_.add_timeout_in(dialog_id.get(), 4.0); pending_send_dialog_action_timeout_.add_timeout_in(dialog_id.get(), 4.0);
CHECK(!queue_it->second.empty()); auto &queue = queue_it->second.queue_;
const Message *m = get_message(d, queue_it->second.begin()->first); CHECK(!queue.empty());
const Message *m = get_message(d, queue.begin()->first);
if (m == nullptr) { if (m == nullptr) {
return; return;
} }
@ -34740,7 +34753,9 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
auto queue_id = ChainId(dialog_id, message_content_type).get(); auto queue_id = ChainId(dialog_id, message_content_type).get();
if (queue_id & 1) { if (queue_id & 1) {
LOG(INFO) << "Add " << message_id << " from " << source << " to queue " << queue_id; LOG(INFO) << "Add " << message_id << " from " << source << " to queue " << queue_id;
yet_unsent_media_queues_[queue_id][message_id]; // reserve place for promise auto &queue = yet_unsent_media_queues_[queue_id];
queue.dialog_id_ = dialog_id;
queue.queue_[message_id]; // reserve place for promise
if (!td_->auth_manager_->is_bot()) { if (!td_->auth_manager_->is_bot()) {
pending_send_dialog_action_timeout_.add_timeout_in(dialog_id.get(), 1.0); pending_send_dialog_action_timeout_.add_timeout_in(dialog_id.get(), 1.0);
} }

View File

@ -3570,7 +3570,11 @@ class MessagesManager final : public Actor {
FlatHashMap<DialogId, FlatHashMap<MessageId, int64, MessageIdHash>, DialogIdHash> FlatHashMap<DialogId, FlatHashMap<MessageId, int64, MessageIdHash>, DialogIdHash>
pending_viewed_live_locations_; // ... -> task_id pending_viewed_live_locations_; // ... -> task_id
FlatHashMap<uint64, std::map<MessageId, Promise<Message *>>> yet_unsent_media_queues_; struct UnsentMediaQueue {
DialogId dialog_id_;
std::map<MessageId, Promise<Message *>> queue_;
};
FlatHashMap<uint64, UnsentMediaQueue> yet_unsent_media_queues_;
FlatHashMap<DialogId, NetQueryRef, DialogIdHash> set_typing_query_; FlatHashMap<DialogId, NetQueryRef, DialogIdHash> set_typing_query_;