From f307200ddc8dd6afb981d76cf4fd433d09665eda Mon Sep 17 00:00:00 2001 From: levlam Date: Fri, 22 Jan 2021 17:09:08 +0300 Subject: [PATCH] Support attachment upload in importMessages. --- td/telegram/AnimationsManager.cpp | 2 +- td/telegram/MessageContent.cpp | 34 +++++ td/telegram/MessageContent.h | 3 + td/telegram/MessagesManager.cpp | 216 +++++++++++++++++++++++++++++- td/telegram/MessagesManager.h | 38 +++++- 5 files changed, 283 insertions(+), 10 deletions(-) diff --git a/td/telegram/AnimationsManager.cpp b/td/telegram/AnimationsManager.cpp index 32401f987..b2d20b24d 100644 --- a/td/telegram/AnimationsManager.cpp +++ b/td/telegram/AnimationsManager.cpp @@ -391,7 +391,7 @@ tl_object_ptr AnimationsManager::get_input_media( } return make_tl_object( flags, false /*ignored*/, false /*ignored*/, std::move(input_file), std::move(input_thumbnail), mime_type, - std::move(attributes), vector>(), 0); + std::move(attributes), std::move(added_stickers), 0); } else { CHECK(!file_view.has_remote_location()); } diff --git a/td/telegram/MessageContent.cpp b/td/telegram/MessageContent.cpp index d10679aeb..a124dafba 100644 --- a/td/telegram/MessageContent.cpp +++ b/td/telegram/MessageContent.cpp @@ -2496,6 +2496,40 @@ tl_object_ptr get_input_media(const MessageContent *co return input_media; } +tl_object_ptr get_fake_input_media(Td *td, tl_object_ptr input_file, + FileId file_id) { + FileView file_view = td->file_manager_->get_file_view(file_id); + auto file_type = file_view.get_type(); + switch (file_type) { + case FileType::Animation: + case FileType::Audio: + case FileType::Document: + case FileType::Sticker: + case FileType::Video: + case FileType::VoiceNote: { + vector> attributes; + auto file_name = file_view.suggested_name(); + if (!file_name.empty()) { + attributes.push_back(make_tl_object(file_name)); + } + string mime_type = MimeType::from_extension(PathView(file_name).extension()); + int32 flags = 0; + if (file_type == FileType::Video) { + flags |= telegram_api::inputMediaUploadedDocument::NOSOUND_VIDEO_MASK; + } + return make_tl_object( + flags, false /*ignored*/, false /*ignored*/, std::move(input_file), nullptr, mime_type, std::move(attributes), + vector>(), 0); + } + case FileType::Photo: + return make_tl_object( + 0, std::move(input_file), vector>(), 0); + default: + UNREACHABLE(); + } + return nullptr; +} + void delete_message_content_thumbnail(MessageContent *content, Td *td) { switch (content->get_type()) { case MessageContentType::Animation: { diff --git a/td/telegram/MessageContent.h b/td/telegram/MessageContent.h index c04038a6c..398d2d4c4 100644 --- a/td/telegram/MessageContent.h +++ b/td/telegram/MessageContent.h @@ -117,6 +117,9 @@ tl_object_ptr get_input_media(const MessageContent *co tl_object_ptr get_input_media(const MessageContent *content, Td *td, int32 ttl, const string &emoji, bool force); +tl_object_ptr get_fake_input_media(Td *td, tl_object_ptr input_file, + FileId file_id); + void delete_message_content_thumbnail(MessageContent *content, Td *td); bool can_forward_message_content(const MessageContent *content); diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 89624e62b..f15649056 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -1025,7 +1025,6 @@ class InitHistoryImportQuery : public Td::ResultHandler { file_id_ = file_id; dialog_id_ = dialog_id; attached_file_ids_ = std::move(attached_file_ids); - attached_file_ids_.clear(); auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Write); CHECK(input_peer != nullptr); @@ -1047,18 +1046,73 @@ class InitHistoryImportQuery : public Td::ResultHandler { } void on_error(uint64 id, Status status) override { - td->file_manager_->delete_partial_remote_location(file_id_); if (!td->auth_manager_->is_bot() && FileReferenceManager::is_file_reference_error(status)) { LOG(ERROR) << "Receive file reference error " << status; } + if (begins_with(status.message(), "FILE_PART_") && ends_with(status.message(), "_MISSING")) { + // TODO support FILE_PART_*_MISSING + } - // TODO support FILE_PART_*_MISSING + td->file_manager_->delete_partial_remote_location(file_id_); td->messages_manager_->on_get_dialog_error(dialog_id_, status, "InitHistoryImportQuery"); promise_.set_error(std::move(status)); } }; +class UploadImportedMediaQuery : public Td::ResultHandler { + Promise promise_; + DialogId dialog_id_; + int64 import_id_; + FileId file_id_; + + public: + explicit UploadImportedMediaQuery(Promise &&promise) : promise_(std::move(promise)) { + } + + void send(DialogId dialog_id, int64 import_id, const string &file_name, FileId file_id, + tl_object_ptr &&input_media) { + CHECK(input_media != nullptr); + dialog_id_ = dialog_id; + import_id_ = import_id; + file_id_ = file_id; + + auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Write); + if (input_peer == nullptr) { + return on_error(0, Status::Error(400, "Have no write access to the chat")); + } + + send_query(G()->net_query_creator().create(telegram_api::messages_uploadImportedMedia( + std::move(input_peer), import_id, file_name, std::move(input_media)))); + } + + void on_result(uint64 id, BufferSlice packet) override { + auto result_ptr = fetch_result(packet); + if (result_ptr.is_error()) { + return on_error(id, result_ptr.move_as_error()); + } + + td->file_manager_->delete_partial_remote_location(file_id_); + + // ignore response + + promise_.set_value(Unit()); + } + + void on_error(uint64 id, Status status) override { + if (FileReferenceManager::is_file_reference_error(status)) { + LOG(ERROR) << "Receive file reference error " << status; + } + if (begins_with(status.message(), "FILE_PART_") && ends_with(status.message(), "_MISSING")) { + // TODO support FILE_PART_*_MISSING + } + + td->file_manager_->delete_partial_remote_location(file_id_); + td->messages_manager_->on_get_dialog_error(dialog_id_, status, "UploadImportedMediaQuery"); + promise_.set_error(std::move(status)); + } +}; + class StartImportHistoryQuery : public Td::ResultHandler { Promise promise_; DialogId dialog_id_; @@ -4683,6 +4737,24 @@ class MessagesManager::UploadImportedMessagesCallback : public FileManager::Uplo } }; +class MessagesManager::UploadImportedMessageAttachmentCallback : public FileManager::UploadCallback { + public: + void on_upload_ok(FileId file_id, tl_object_ptr input_file) override { + send_closure_later(G()->messages_manager(), &MessagesManager::on_upload_imported_message_attachment, file_id, + std::move(input_file)); + } + void on_upload_encrypted_ok(FileId file_id, tl_object_ptr input_file) override { + UNREACHABLE(); + } + void on_upload_secure_ok(FileId file_id, tl_object_ptr input_file) override { + UNREACHABLE(); + } + void on_upload_error(FileId file_id, Status error) override { + send_closure_later(G()->messages_manager(), &MessagesManager::on_upload_imported_message_attachment_error, file_id, + std::move(error)); + } +}; + template void MessagesManager::Message::store(StorerT &storer) const { using td::store; @@ -5618,6 +5690,7 @@ MessagesManager::MessagesManager(Td *td, ActorShared<> parent) : td_(td), parent upload_thumbnail_callback_ = std::make_shared(); upload_dialog_photo_callback_ = std::make_shared(); upload_imported_messages_callback_ = std::make_shared(); + upload_imported_message_attachment_callback_ = std::make_shared(); channel_get_difference_timeout_.set_callback(on_channel_get_difference_timeout_callback); channel_get_difference_timeout_.set_callback_data(static_cast(this)); @@ -8686,7 +8759,7 @@ void MessagesManager::on_upload_imported_messages(FileId file_id, tl_object_ptr< CHECK(!file_view.is_encrypted()); if (input_file == nullptr && file_view.has_remote_location()) { if (file_view.main_remote_location().is_web()) { - return promise.set_error(Status::Error(400, "Can't use web photo")); + return promise.set_error(Status::Error(400, "Can't use web file")); } if (is_reupload) { return promise.set_error(Status::Error(400, "Failed to reupload the file")); @@ -8727,6 +8800,72 @@ void MessagesManager::on_upload_imported_messages_error(FileId file_id, Status s promise.set_error(std::move(status)); } +void MessagesManager::on_upload_imported_message_attachment(FileId file_id, + tl_object_ptr input_file) { + LOG(INFO) << "File " << file_id << " has been uploaded"; + + auto it = being_uploaded_imported_message_attachments_.find(file_id); + if (it == being_uploaded_imported_message_attachments_.end()) { + // just in case, as in on_upload_media + return; + } + + CHECK(it->second != nullptr); + DialogId dialog_id = it->second->dialog_id; + int64 import_id = it->second->import_id; + bool is_reupload = it->second->is_reupload; + Promise promise = std::move(it->second->promise); + + being_uploaded_imported_message_attachments_.erase(it); + + FileView file_view = td_->file_manager_->get_file_view(file_id); + CHECK(!file_view.is_encrypted()); + if (input_file == nullptr && file_view.has_remote_location()) { + if (file_view.main_remote_location().is_web()) { + return promise.set_error(Status::Error(400, "Can't use web file")); + } + if (is_reupload) { + return promise.set_error(Status::Error(400, "Failed to reupload the file")); + } + + // delete file reference and forcely reupload the file + auto file_reference = + file_view.get_type() == FileType::Photo + ? FileManager::extract_file_reference(file_view.main_remote_location().as_input_photo()) + : FileManager::extract_file_reference(file_view.main_remote_location().as_input_document()); + td_->file_manager_->delete_file_reference(file_id, file_reference); + upload_imported_message_attachment(dialog_id, import_id, file_id, true, std::move(promise), {-1}); + return; + } + CHECK(input_file != nullptr); + + td_->create_handler(std::move(promise)) + ->send(dialog_id, import_id, file_view.suggested_name(), file_id, + get_fake_input_media(td_, std::move(input_file), file_id)); +} + +void MessagesManager::on_upload_imported_message_attachment_error(FileId file_id, Status status) { + if (G()->close_flag()) { + // do not fail upload if closing + return; + } + + LOG(INFO) << "File " << file_id << " has upload error " << status; + CHECK(status.is_error()); + + auto it = being_uploaded_imported_message_attachments_.find(file_id); + if (it == being_uploaded_imported_message_attachments_.end()) { + // just in case, as in on_upload_media_error + return; + } + + Promise promise = std::move(it->second->promise); + + being_uploaded_imported_message_attachments_.erase(it); + + promise.set_error(std::move(status)); +} + void MessagesManager::before_get_difference() { running_get_difference_ = true; @@ -26341,14 +26480,79 @@ void MessagesManager::upload_imported_messages(DialogId dialog_id, FileId file_i void MessagesManager::start_import_messages(DialogId dialog_id, int64 import_id, vector &&attached_file_ids, Promise &&promise) { - CHECK(attached_file_ids.empty()); if (G()->close_flag()) { return promise.set_error(Status::Error(500, "Request aborted")); } TRY_STATUS_PROMISE(promise, can_send_message(dialog_id)); - td_->create_handler(std::move(promise))->send(dialog_id, import_id); + auto pending_message_import = make_unique(); + pending_message_import->dialog_id = dialog_id; + pending_message_import->import_id = import_id; + pending_message_import->promise = std::move(promise); + + auto &multipromise = pending_message_import->upload_files_multipromise; + + int64 random_id; + do { + random_id = Random::secure_int64(); + } while (random_id == 0 || pending_message_imports_.find(random_id) != pending_message_imports_.end()); + pending_message_imports_[random_id] = std::move(pending_message_import); + + multipromise.add_promise(PromiseCreator::lambda([random_id](Result result) { + send_closure_later(G()->messages_manager(), &MessagesManager::on_imported_message_attachments_uploaded, random_id, + std::move(result)); + })); + auto lock_promise = multipromise.get_promise(); + + for (auto attached_file_id : attached_file_ids) { + upload_imported_message_attachment(dialog_id, import_id, td_->file_manager_->dup_file_id(attached_file_id), false, + multipromise.get_promise()); + } + + lock_promise.set_value(Unit()); +} + +void MessagesManager::upload_imported_message_attachment(DialogId dialog_id, int64 import_id, FileId file_id, + bool is_reupload, Promise &&promise, + vector bad_parts) { + CHECK(file_id.is_valid()); + LOG(INFO) << "Ask to upload improted message attached file " << file_id; + CHECK(being_uploaded_imported_message_attachments_.find(file_id) == + being_uploaded_imported_message_attachments_.end()); + being_uploaded_imported_message_attachments_.emplace( + file_id, + td::make_unique(dialog_id, import_id, is_reupload, std::move(promise))); + // TODO use force_reupload if is_reupload + td_->file_manager_->resume_upload(file_id, std::move(bad_parts), upload_imported_message_attachment_callback_, 1, 0); +} + +void MessagesManager::on_imported_message_attachments_uploaded(int64 random_id, Result &&result) { + if (G()->close_flag()) { + result = Status::Error(500, "Request aborted"); + } + + auto it = pending_message_imports_.find(random_id); + CHECK(it != pending_message_imports_.end()); + + auto pending_message_import = std::move(it->second); + CHECK(pending_message_import != nullptr); + + pending_message_imports_.erase(it); + + if (result.is_error()) { + pending_message_import->promise.set_error(result.move_as_error()); + return; + } + + CHECK(pending_message_import->upload_files_multipromise.promise_count() == 0); + + auto promise = std::move(pending_message_import->promise); + auto dialog_id = pending_message_import->dialog_id; + + TRY_STATUS_PROMISE(promise, can_send_message(dialog_id)); + + td_->create_handler(std::move(promise))->send(dialog_id, pending_message_import->import_id); } bool MessagesManager::on_update_message_id(int64 random_id, MessageId new_message_id, const string &source) { diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index 16b94d513..c5bb5d49e 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -896,9 +896,6 @@ class MessagesManager : public Actor { void upload_dialog_photo(DialogId dialog_id, FileId file_id, bool is_animation, double main_frame_timestamp, bool is_reupload, Promise &&promise, vector bad_parts = {}); - void upload_imported_messages(DialogId dialog_id, FileId file_id, vector attached_file_ids, bool is_reupload, - Promise &&promise, vector bad_parts = {}); - void on_binlog_events(vector &&events); void set_poll_answer(FullMessageId full_message_id, vector &&option_ids, Promise &&promise); @@ -2833,9 +2830,20 @@ class MessagesManager : public Actor { tl_object_ptr &&input_chat_photo, Promise &&promise); + void upload_imported_messages(DialogId dialog_id, FileId file_id, vector attached_file_ids, bool is_reupload, + Promise &&promise, vector bad_parts = {}); + void on_upload_imported_messages(FileId file_id, tl_object_ptr input_file); void on_upload_imported_messages_error(FileId file_id, Status status); + void upload_imported_message_attachment(DialogId dialog_id, int64 import_id, FileId file_id, bool is_reupload, + Promise &&promise, vector bad_parts = {}); + + void on_upload_imported_message_attachment(FileId file_id, tl_object_ptr input_file); + void on_upload_imported_message_attachment_error(FileId file_id, Status status); + + void on_imported_message_attachments_uploaded(int64 random_id, Result &&result); + void add_sponsored_dialog(const Dialog *d, DialogSource source); void save_sponsored_dialog(); @@ -2969,11 +2977,13 @@ class MessagesManager : public Actor { class UploadThumbnailCallback; class UploadDialogPhotoCallback; class UploadImportedMessagesCallback; + class UploadImportedMessageAttachmentCallback; std::shared_ptr upload_media_callback_; std::shared_ptr upload_thumbnail_callback_; std::shared_ptr upload_dialog_photo_callback_; std::shared_ptr upload_imported_messages_callback_; + std::shared_ptr upload_imported_message_attachment_callback_; double last_channel_pts_jump_warning_time_ = 0; @@ -3072,6 +3082,28 @@ class MessagesManager : public Actor { }; std::unordered_map, FileIdHash> being_uploaded_imported_messages_; + struct UploadedImportedMessageAttachmentInfo { + DialogId dialog_id; + int64 import_id; + bool is_reupload; + Promise promise; + + UploadedImportedMessageAttachmentInfo(DialogId dialog_id, int64 import_id, bool is_reupload, + Promise &&promise) + : dialog_id(dialog_id), import_id(import_id), is_reupload(is_reupload), promise(std::move(promise)) { + } + }; + std::unordered_map, FileIdHash> + being_uploaded_imported_message_attachments_; + + struct PendingMessageImport { + MultiPromiseActor upload_files_multipromise{"UploadAttachedFilesMultiPromiseActor"}; + DialogId dialog_id; + int64 import_id = 0; + Promise promise; + }; + std::unordered_map> pending_message_imports_; + struct PendingMessageGroupSend { DialogId dialog_id; size_t finished_count = 0;