diff --git a/td/telegram/BusinessConnectionManager.cpp b/td/telegram/BusinessConnectionManager.cpp index 16cb63f41..2b9ac6c56 100644 --- a/td/telegram/BusinessConnectionManager.cpp +++ b/td/telegram/BusinessConnectionManager.cpp @@ -10,6 +10,9 @@ #include "td/telegram/AuthManager.h" #include "td/telegram/ContactsManager.h" #include "td/telegram/DialogManager.h" +#include "td/telegram/files/FileId.h" +#include "td/telegram/files/FileManager.h" +#include "td/telegram/files/FileType.h" #include "td/telegram/Global.h" #include "td/telegram/MessageContent.h" #include "td/telegram/MessageCopyOptions.h" @@ -252,8 +255,111 @@ class BusinessConnectionManager::SendBusinessMediaQuery final : public Td::Resul } }; +class BusinessConnectionManager::UploadBusinessMediaQuery final : public Td::ResultHandler { + Promise promise_; + unique_ptr message_; + bool was_thumbnail_uploaded_ = false; + + void delete_thumbnail() { + if (!was_thumbnail_uploaded_) { + return; + } + + auto file_id = get_message_file_id(message_); + CHECK(file_id.is_valid()); + auto thumbnail_file_id = td_->business_connection_manager_->get_message_thumbnail_file_id(message_, file_id); + CHECK(thumbnail_file_id.is_valid()); + // always delete partial remote location for the thumbnail, because it can't be reused anyway + td_->file_manager_->delete_partial_remote_location(thumbnail_file_id); + } + + public: + explicit UploadBusinessMediaQuery(Promise &&promise) : promise_(std::move(promise)) { + } + + void send(unique_ptr message, telegram_api::object_ptr &&input_media) { + CHECK(input_media != nullptr); + message_ = std::move(message); + CHECK(FileManager::extract_was_uploaded(input_media)); + was_thumbnail_uploaded_ = FileManager::extract_was_thumbnail_uploaded(input_media); + + int32 flags = telegram_api::messages_uploadMedia::BUSINESS_CONNECTION_ID_MASK; + auto input_peer = td_->dialog_manager_->get_input_peer(message_->dialog_id_, AccessRights::Know); + CHECK(input_peer != nullptr); + + send_query(G()->net_query_creator().create(telegram_api::messages_uploadMedia( + flags, message_->business_connection_id_.get(), std::move(input_peer), std::move(input_media)))); + } + + void on_result(BufferSlice packet) final { + auto result_ptr = fetch_result(packet); + if (result_ptr.is_error()) { + return on_error(result_ptr.move_as_error()); + } + + delete_thumbnail(); + + auto ptr = result_ptr.move_as_ok(); + LOG(INFO) << "Receive result for UploadBusinessMediaQuery: " << to_string(ptr); + td_->business_connection_manager_->complete_upload_media(std::move(message_), std::move(ptr), std::move(promise_)); + } + + void on_error(Status status) final { + LOG(INFO) << "Receive error for UploadBusinessMediaQuery: " << status; + delete_thumbnail(); + + auto file_id = get_message_file_id(message_); + if (status.code() != 429 && status.code() < 500) { + td_->file_manager_->delete_partial_remote_location(file_id); + } else { + send_closure_later(G()->file_manager(), &FileManager::cancel_upload, file_id); + } + promise_.set_error(std::move(status)); + } +}; + +class BusinessConnectionManager::UploadMediaCallback final : public FileManager::UploadCallback { + public: + void on_upload_ok(FileId file_id, telegram_api::object_ptr input_file) final { + send_closure_later(G()->business_connection_manager(), &BusinessConnectionManager::on_upload_media, file_id, + std::move(input_file)); + } + void on_upload_encrypted_ok(FileId file_id, + telegram_api::object_ptr input_file) final { + UNREACHABLE(); + } + void on_upload_secure_ok(FileId file_id, telegram_api::object_ptr input_file) final { + UNREACHABLE(); + } + void on_upload_error(FileId file_id, Status error) final { + send_closure_later(G()->business_connection_manager(), &BusinessConnectionManager::on_upload_media_error, file_id, + std::move(error)); + } +}; + +class BusinessConnectionManager::UploadThumbnailCallback final : public FileManager::UploadCallback { + public: + void on_upload_ok(FileId file_id, telegram_api::object_ptr input_file) final { + send_closure_later(G()->business_connection_manager(), &BusinessConnectionManager::on_upload_thumbnail, file_id, + std::move(input_file)); + } + void on_upload_encrypted_ok(FileId file_id, + telegram_api::object_ptr input_file) final { + UNREACHABLE(); + } + void on_upload_secure_ok(FileId file_id, telegram_api::object_ptr input_file) final { + UNREACHABLE(); + } + void on_upload_error(FileId file_id, Status error) final { + send_closure_later(G()->business_connection_manager(), &BusinessConnectionManager::on_upload_thumbnail, file_id, + nullptr); + } +}; + BusinessConnectionManager::BusinessConnectionManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) { + upload_media_callback_ = std::make_shared(); + upload_thumbnail_callback_ = std::make_shared(); } BusinessConnectionManager::~BusinessConnectionManager() = default; @@ -514,7 +620,7 @@ void BusinessConnectionManager::send_message(BusinessConnectionId business_conne process_input_message_content(dialog_id, std::move(input_message_content))); auto input_reply_to = create_business_message_input_reply_to(std::move(reply_to)); TRY_RESULT_PROMISE(promise, message_reply_markup, - get_reply_markup(std::move(reply_markup), DialogType::User, td_->auth_manager_->is_bot(), false)); + get_reply_markup(std::move(reply_markup), DialogType::User, true, false)); auto message = create_business_message_to_send(std::move(business_connection_id), dialog_id, std::move(input_reply_to), disable_notification, protect_content, @@ -527,7 +633,7 @@ void BusinessConnectionManager::do_send_message(unique_ptr &&mes Promise> &&promise) { LOG(INFO) << "Send business message to " << message->dialog_id_; - auto content = message->content_.get(); + const auto *content = message->content_.get(); CHECK(content != nullptr); auto content_type = content->get_type(); if (content_type == MessageContentType::Text) { @@ -545,7 +651,223 @@ void BusinessConnectionManager::do_send_message(unique_ptr &&mes td_->create_handler(std::move(promise))->send(std::move(message), std::move(input_media)); return; } - promise.set_error(Status::Error(400, "Unsupported")); + upload_media(std::move(message), PromiseCreator::lambda([actor_id = actor_id(this), promise = std::move(promise)]( + Result &&result) mutable { + if (result.is_error()) { + return promise.set_error(result.move_as_error()); + } + auto message_input_media = result.move_as_ok(); + send_closure(actor_id, &BusinessConnectionManager::complete_send_media, + std::move(message_input_media.message_), std::move(message_input_media.input_media_), + std::move(promise)); + })); +} + +FileId BusinessConnectionManager::get_message_file_id(const unique_ptr &message) { + CHECK(message != nullptr); + return get_message_content_any_file_id( + message->content_.get()); // any_file_id, because it could be a photo sent by ID +} + +FileId BusinessConnectionManager::get_message_thumbnail_file_id(const unique_ptr &message, + FileId file_id) const { + FileView file_view = td_->file_manager_->get_file_view(file_id); + if (get_main_file_type(file_view.get_type()) == FileType::Photo) { + return FileId(); + } + CHECK(message != nullptr); + return get_message_content_thumbnail_file_id(message->content_.get(), td_); +} + +void BusinessConnectionManager::upload_media(unique_ptr &&message, Promise &&promise, + vector bad_parts) { + auto content_type = message->content_->get_type(); + if (content_type == MessageContentType::Game || content_type == MessageContentType::Poll || + content_type == MessageContentType::Story) { + return promise.set_error(Status::Error(400, "Message has no file")); + } + + BeingUploadedMedia media; + media.message_ = std::move(message); + media.promise_ = std::move(promise); + + auto file_id = get_message_file_id(media.message_); + LOG(INFO) << "Ask to upload file " << file_id << " with bad parts " << bad_parts; + CHECK(file_id.is_valid()); + bool is_inserted = being_uploaded_files_.emplace(file_id, std::move(media)).second; + CHECK(is_inserted); + // need to call resume_upload synchronously to make upload process consistent with being_uploaded_files_ + // and to send is_uploading_active == true in the updates + td_->file_manager_->resume_upload(file_id, std::move(bad_parts), upload_media_callback_, 1, 0); +} + +void BusinessConnectionManager::complete_send_media(unique_ptr &&message, + telegram_api::object_ptr &&input_media, + Promise> &&promise) { + TRY_STATUS_PROMISE(promise, G()->close_status()); + CHECK(message != nullptr); + CHECK(input_media != nullptr); + td_->create_handler(std::move(promise))->send(std::move(message), std::move(input_media)); +} + +void BusinessConnectionManager::on_upload_media(FileId file_id, + telegram_api::object_ptr input_file) { + LOG(INFO) << "File " << file_id << " has been uploaded"; + + auto it = being_uploaded_files_.find(file_id); + CHECK(it != being_uploaded_files_.end()); + auto being_uploaded_media = std::move(it->second); + being_uploaded_files_.erase(it); + + being_uploaded_media.input_file_ = std::move(input_file); + auto thumbnail_file_id = get_message_thumbnail_file_id(being_uploaded_media.message_, file_id); + if (being_uploaded_media.input_file_ != nullptr && thumbnail_file_id.is_valid()) { + // TODO: download thumbnail if needed (like in secret chats) + LOG(INFO) << "Ask to upload thumbnail " << thumbnail_file_id; + bool is_inserted = being_uploaded_thumbnails_.emplace(thumbnail_file_id, std::move(being_uploaded_media)).second; + CHECK(is_inserted); + td_->file_manager_->upload(thumbnail_file_id, upload_thumbnail_callback_, 1, 0); + } else { + do_upload_media(std::move(being_uploaded_media), nullptr); + } +} + +void BusinessConnectionManager::on_upload_media_error(FileId file_id, Status status) { + CHECK(status.is_error()); + + auto it = being_uploaded_files_.find(file_id); + CHECK(it != being_uploaded_files_.end()); + auto being_uploaded_media = std::move(it->second); + being_uploaded_files_.erase(it); + + being_uploaded_media.promise_.set_error(std::move(status)); +} + +void BusinessConnectionManager::on_upload_thumbnail( + FileId thumbnail_file_id, telegram_api::object_ptr thumbnail_input_file) { + LOG(INFO) << "Thumbnail " << thumbnail_file_id << " has been uploaded as " << to_string(thumbnail_input_file); + + auto it = being_uploaded_thumbnails_.find(thumbnail_file_id); + CHECK(it != being_uploaded_thumbnails_.end()); + auto being_uploaded_media = std::move(it->second); + being_uploaded_thumbnails_.erase(it); + + if (thumbnail_input_file == nullptr) { + delete_message_content_thumbnail(being_uploaded_media.message_->content_.get(), td_); + } + + do_upload_media(std::move(being_uploaded_media), std::move(thumbnail_input_file)); +} + +void BusinessConnectionManager::do_upload_media(BeingUploadedMedia &&being_uploaded_media, + telegram_api::object_ptr input_thumbnail) { + auto file_id = get_message_file_id(being_uploaded_media.message_); + auto thumbnail_file_id = get_message_thumbnail_file_id(being_uploaded_media.message_, file_id); + auto input_file = std::move(being_uploaded_media.input_file_); + bool have_input_file = input_file != nullptr; + bool have_input_thumbnail = input_thumbnail != nullptr; + LOG(INFO) << "Do upload media file " << file_id << " with thumbnail " << thumbnail_file_id + << ", have_input_file = " << have_input_file << ", have_input_thumbnail = " << have_input_thumbnail; + + const auto *message = being_uploaded_media.message_.get(); + auto input_media = get_input_media(message->content_.get(), td_, std::move(input_file), std::move(input_thumbnail), + file_id, thumbnail_file_id, message->ttl_, message->send_emoji_, true); + CHECK(input_media != nullptr); + auto input_media_id = input_media->get_id(); + if (!have_input_file || input_media_id == telegram_api::inputMediaDocument::ID || + input_media_id == telegram_api::inputMediaPhoto::ID || + input_media_id == telegram_api::inputMediaDocumentExternal::ID || + input_media_id == telegram_api::inputMediaPhotoExternal::ID) { + // can use input media directly + UploadMediaResult result; + result.message_ = std::move(being_uploaded_media.message_); + result.input_media_ = std::move(input_media); + return being_uploaded_media.promise_.set_value(std::move(result)); + } + + switch (input_media->get_id()) { + case telegram_api::inputMediaUploadedDocument::ID: + if (message->content_->get_type() != MessageContentType::Animation) { + static_cast(input_media.get())->flags_ |= + telegram_api::inputMediaUploadedDocument::NOSOUND_VIDEO_MASK; + } + // fallthrough + case telegram_api::inputMediaUploadedPhoto::ID: + td_->create_handler(std::move(being_uploaded_media.promise_)) + ->send(std::move(being_uploaded_media.message_), std::move(input_media)); + break; + default: + LOG(ERROR) << "Have wrong input media " << to_string(input_media); + being_uploaded_media.promise_.set_error(Status::Error(400, "Invalid input media")); + } +} + +void BusinessConnectionManager::complete_upload_media(unique_ptr &&message, + telegram_api::object_ptr &&media, + Promise &&promise) { + auto *content = message->content_.get(); + auto *caption = get_message_content_caption(content); + auto has_spoiler = get_message_content_has_spoiler(content); + auto new_content = get_message_content(td_, caption == nullptr ? FormattedText() : *caption, std::move(media), + td_->dialog_manager_->get_my_dialog_id(), G()->unix_time(), false, UserId(), + nullptr, nullptr, "complete_upload_media"); + set_message_content_has_spoiler(new_content.get(), has_spoiler); + + bool is_content_changed = false; + bool need_update = false; + + unique_ptr &old_content = message->content_; + MessageContentType old_content_type = old_content->get_type(); + MessageContentType new_content_type = new_content->get_type(); + + auto old_file_id = get_message_file_id(message); + if (old_content_type != new_content_type) { + need_update = true; + + auto new_file_id = get_message_content_any_file_id(new_content.get()); + if (new_file_id.is_valid()) { + FileView old_file_view = td_->file_manager_->get_file_view(old_file_id); + FileView new_file_view = td_->file_manager_->get_file_view(new_file_id); + // if file type has changed, but file size remains the same, we are trying to update local location of the new + // file with the old local location + if (old_file_view.has_local_location() && !new_file_view.has_local_location() && old_file_view.size() != 0 && + old_file_view.size() == new_file_view.size()) { + auto old_file_type = old_file_view.get_type(); + auto new_file_type = new_file_view.get_type(); + + if (is_document_file_type(old_file_type) && is_document_file_type(new_file_type)) { + auto &old_location = old_file_view.local_location(); + auto r_file_id = td_->file_manager_->register_local( + FullLocalFileLocation(new_file_type, old_location.path_, old_location.mtime_nsec_), DialogId(), + old_file_view.size()); + if (r_file_id.is_ok()) { + LOG_STATUS(td_->file_manager_->merge(new_file_id, r_file_id.ok())); + } + } + } + } + } else { + merge_message_contents(td_, old_content.get(), new_content.get(), false, DialogId(), true, is_content_changed, + need_update); + compare_message_contents(td_, old_content.get(), new_content.get(), is_content_changed, need_update); + } + send_closure_later(G()->file_manager(), &FileManager::cancel_upload, old_file_id); + + if (is_content_changed || need_update) { + old_content = std::move(new_content); + update_message_content_file_id_remote(old_content.get(), old_file_id); + } else { + update_message_content_file_id_remote(old_content.get(), get_message_content_any_file_id(new_content.get())); + } + + auto input_media = get_input_media(message->content_.get(), td_, message->ttl_, message->send_emoji_, true); + if (input_media == nullptr) { + return promise.set_error(Status::Error(400, "Failed to upload file")); + } + UploadMediaResult result; + result.message_ = std::move(message); + result.input_media_ = std::move(input_media); + promise.set_value(std::move(result)); } } // namespace td diff --git a/td/telegram/BusinessConnectionManager.h b/td/telegram/BusinessConnectionManager.h index 92207f8ed..a8e37f1ac 100644 --- a/td/telegram/BusinessConnectionManager.h +++ b/td/telegram/BusinessConnectionManager.h @@ -8,6 +8,7 @@ #include "td/telegram/BusinessConnectionId.h" #include "td/telegram/DialogId.h" +#include "td/telegram/files/FileId.h" #include "td/telegram/MessageInputReplyTo.h" #include "td/telegram/net/DcId.h" #include "td/telegram/td_api.h" @@ -16,10 +17,13 @@ #include "td/actor/actor.h" #include "td/utils/common.h" +#include "td/utils/FlatHashMap.h" #include "td/utils/Promise.h" #include "td/utils/Status.h" #include "td/utils/WaitFreeHashMap.h" +#include + namespace td { struct InputMessageContent; @@ -64,6 +68,20 @@ class BusinessConnectionManager final : public Actor { struct PendingMessage; class SendBusinessMessageQuery; class SendBusinessMediaQuery; + class UploadBusinessMediaQuery; + class UploadMediaCallback; + class UploadThumbnailCallback; + + struct UploadMediaResult { + unique_ptr message_; + telegram_api::object_ptr input_media_; + }; + + struct BeingUploadedMedia { + unique_ptr message_; + telegram_api::object_ptr input_file_; + Promise promise_; + }; void tear_down() final; @@ -84,12 +102,43 @@ class BusinessConnectionManager final : public Actor { void do_send_message(unique_ptr &&message, Promise> &&promise); + static FileId get_message_file_id(const unique_ptr &message); + + FileId get_message_thumbnail_file_id(const unique_ptr &message, FileId file_id) const; + + void upload_media(unique_ptr &&message, Promise &&promise, + vector bad_parts = {}); + + void complete_send_media(unique_ptr &&message, + telegram_api::object_ptr &&input_media, + Promise> &&promise); + + void on_upload_media(FileId file_id, telegram_api::object_ptr input_file); + + void on_upload_media_error(FileId file_id, Status status); + + void on_upload_thumbnail(FileId thumbnail_file_id, + telegram_api::object_ptr thumbnail_input_file); + + void do_upload_media(BeingUploadedMedia &&being_uploaded_media, + telegram_api::object_ptr input_thumbnail); + + void complete_upload_media(unique_ptr &&message, + telegram_api::object_ptr &&media, + Promise &&promise); + WaitFreeHashMap, BusinessConnectionIdHash> business_connections_; FlatHashMap>>, BusinessConnectionIdHash> get_business_connection_queries_; + std::shared_ptr upload_media_callback_; + std::shared_ptr upload_thumbnail_callback_; + + FlatHashMap being_uploaded_files_; + FlatHashMap being_uploaded_thumbnails_; + Td *td_; ActorShared<> parent_; }; diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 6c5d409e0..3a5e1a2c8 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -8147,7 +8147,7 @@ void MessagesManager::do_send_media(DialogId dialog_id, Message *m, FileId file_ << ", have_input_file = " << have_input_file << ", have_input_thumbnail = " << have_input_thumbnail << ", self-destruct time = " << m->ttl; - MessageContent *content = nullptr; + const MessageContent *content = nullptr; if (m->message_id.is_any_server()) { content = m->edited_content.get(); if (content == nullptr) {