From 30e4e8cecb10147d240497f50e1af07da831cba2 Mon Sep 17 00:00:00 2001 From: levlam Date: Fri, 12 Jul 2024 17:27:26 +0300 Subject: [PATCH] Add separate FileUploadManager. --- CMakeLists.txt | 2 + td/telegram/files/FileLoadManager.cpp | 103 ------------- td/telegram/files/FileLoadManager.h | 56 +------ td/telegram/files/FileManager.cpp | 85 ++++++---- td/telegram/files/FileManager.h | 69 +++++---- td/telegram/files/FileUploadManager.cpp | 196 ++++++++++++++++++++++++ td/telegram/files/FileUploadManager.h | 121 +++++++++++++++ 7 files changed, 423 insertions(+), 209 deletions(-) create mode 100644 td/telegram/files/FileUploadManager.cpp create mode 100644 td/telegram/files/FileUploadManager.h diff --git a/CMakeLists.txt b/CMakeLists.txt index b29137d7f..14b1ff2f1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -415,6 +415,7 @@ set(TDLIB_SOURCE_PART1 td/telegram/files/FileStatsWorker.cpp td/telegram/files/FileType.cpp td/telegram/files/FileUploader.cpp + td/telegram/files/FileUploadManager.cpp td/telegram/files/PartsManager.cpp td/telegram/files/ResourceManager.cpp td/telegram/ForumTopic.cpp @@ -714,6 +715,7 @@ set(TDLIB_SOURCE_PART2 td/telegram/files/FileStatsWorker.h td/telegram/files/FileType.h td/telegram/files/FileUploader.h + td/telegram/files/FileUploadManager.h td/telegram/files/PartsManager.h td/telegram/files/ResourceManager.h td/telegram/files/ResourceState.h diff --git a/td/telegram/files/FileLoadManager.cpp b/td/telegram/files/FileLoadManager.cpp index 2dc625545..1ff786630 100644 --- a/td/telegram/files/FileLoadManager.cpp +++ b/td/telegram/files/FileLoadManager.cpp @@ -24,10 +24,6 @@ FileLoadManager::FileLoadManager(unique_ptr callback, ActorShared<> pa } void FileLoadManager::start_up() { - constexpr int64 MAX_UPLOAD_RESOURCE_LIMIT = 4 << 20; - upload_resource_manager_ = create_actor( - "UploadResourceManager", MAX_UPLOAD_RESOURCE_LIMIT, - !G()->keep_media_order() ? ResourceManager::Mode::Greedy : ResourceManager::Mode::Baseline); if (G()->get_option_boolean("is_premium")) { max_download_resource_limit_ *= 8; } @@ -67,42 +63,6 @@ void FileLoadManager::download(QueryId query_id, const FullRemoteFileLocation &r CHECK(is_inserted); } -void FileLoadManager::upload(QueryId query_id, const LocalFileLocation &local_location, - const RemoteFileLocation &remote_location, int64 expected_size, - const FileEncryptionKey &encryption_key, int8 priority, vector bad_parts) { - if (stop_flag_) { - return; - } - NodeId node_id = nodes_container_.create(Node()); - Node *node = nodes_container_.get(node_id); - CHECK(node); - node->query_id_ = query_id; - auto callback = make_unique(actor_shared(this, node_id)); - node->loader_ = create_actor("Uploader", local_location, remote_location, expected_size, encryption_key, - std::move(bad_parts), std::move(callback)); - send_closure(upload_resource_manager_, &ResourceManager::register_worker, - ActorShared(node->loader_.get(), static_cast(-1)), priority); - bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second; - CHECK(is_inserted); -} - -void FileLoadManager::upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size, - int8 priority) { - if (stop_flag_) { - return; - } - NodeId node_id = nodes_container_.create(Node()); - Node *node = nodes_container_.get(node_id); - CHECK(node); - node->query_id_ = query_id; - auto callback = make_unique(actor_shared(this, node_id)); - node->loader_ = create_actor("HashUploader", local_location, size, std::move(callback)); - send_closure(upload_resource_manager_, &ResourceManager::register_worker, - ActorShared(node->loader_.get(), static_cast(-1)), priority); - bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second; - CHECK(is_inserted); -} - void FileLoadManager::update_priority(QueryId query_id, int8 priority) { if (stop_flag_) { return; @@ -171,21 +131,6 @@ void FileLoadManager::cancel(QueryId query_id) { on_error_impl(it->second, Status::Error(-1, "Canceled")); } -void FileLoadManager::update_local_file_location(QueryId query_id, const LocalFileLocation &local) { - if (stop_flag_) { - return; - } - auto it = query_id_to_node_id_.find(query_id); - if (it == query_id_to_node_id_.end()) { - return; - } - auto node = nodes_container_.get(it->second); - if (node == nullptr) { - return; - } - send_closure(node->loader_, &FileLoaderActor::update_local_file_location, local); -} - void FileLoadManager::update_downloaded_part(QueryId query_id, int64 offset, int64 limit) { if (stop_flag_) { return; @@ -229,28 +174,6 @@ void FileLoadManager::on_partial_download(PartialLocalFileLocation partial_local } } -void FileLoadManager::on_hash(string hash) { - auto node_id = get_link_token(); - auto node = nodes_container_.get(node_id); - if (node == nullptr) { - return; - } - if (!stop_flag_) { - callback_->on_hash(node->query_id_, std::move(hash)); - } -} - -void FileLoadManager::on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size) { - auto node_id = get_link_token(); - auto node = nodes_container_.get(node_id); - if (node == nullptr) { - return; - } - if (!stop_flag_) { - callback_->on_partial_upload(node->query_id_, std::move(partial_remote), ready_size); - } -} - void FileLoadManager::on_ok_download(FullLocalFileLocation local, int64 size, bool is_new) { auto node_id = get_link_token(); auto node = nodes_container_.get(node_id); @@ -264,32 +187,6 @@ void FileLoadManager::on_ok_download(FullLocalFileLocation local, int64 size, bo loop(); } -void FileLoadManager::on_ok_upload(FileType file_type, PartialRemoteFileLocation remote, int64 size) { - auto node_id = get_link_token(); - auto node = nodes_container_.get(node_id); - if (node == nullptr) { - return; - } - if (!stop_flag_) { - callback_->on_upload_ok(node->query_id_, file_type, std::move(remote), size); - } - close_node(node_id); - loop(); -} - -void FileLoadManager::on_ok_upload_full(FullRemoteFileLocation remote) { - auto node_id = get_link_token(); - auto node = nodes_container_.get(node_id); - if (node == nullptr) { - return; - } - if (!stop_flag_) { - callback_->on_upload_full_ok(node->query_id_, std::move(remote)); - } - close_node(node_id); - loop(); -} - void FileLoadManager::on_error(Status status) { auto node_id = get_link_token(); on_error_impl(node_id, std::move(status)); diff --git a/td/telegram/files/FileLoadManager.h b/td/telegram/files/FileLoadManager.h index c6d361576..57731bfa3 100644 --- a/td/telegram/files/FileLoadManager.h +++ b/td/telegram/files/FileLoadManager.h @@ -9,11 +9,9 @@ #include "td/telegram/files/FileDownloader.h" #include "td/telegram/files/FileEncryptionKey.h" #include "td/telegram/files/FileFromBytes.h" -#include "td/telegram/files/FileHashUploader.h" #include "td/telegram/files/FileLoaderUtils.h" #include "td/telegram/files/FileLocation.h" #include "td/telegram/files/FileType.h" -#include "td/telegram/files/FileUploader.h" #include "td/telegram/files/ResourceManager.h" #include "td/telegram/net/DcId.h" @@ -38,10 +36,6 @@ class FileLoadManager final : public Actor { virtual void on_start_download(QueryId query_id) = 0; virtual void on_partial_download(QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size, int64 size) = 0; - virtual void on_partial_upload(QueryId query_id, PartialRemoteFileLocation partial_remote, int64 ready_size) = 0; - virtual void on_hash(QueryId query_id, string hash) = 0; - virtual void on_upload_ok(QueryId query_id, FileType file_type, PartialRemoteFileLocation remote, int64 size) = 0; - virtual void on_upload_full_ok(QueryId query_id, FullRemoteFileLocation remote) = 0; virtual void on_download_ok(QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) = 0; virtual void on_error(QueryId query_id, Status status) = 0; }; @@ -51,13 +45,13 @@ class FileLoadManager final : public Actor { void download(QueryId query_id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local, int64 size, string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int64 limit, int8 priority); - void upload(QueryId query_id, const LocalFileLocation &local_location, const RemoteFileLocation &remote_location, - int64 expected_size, const FileEncryptionKey &encryption_key, int8 priority, vector bad_parts); - void upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size, int8 priority); + void update_priority(QueryId query_id, int8 priority); + void from_bytes(QueryId query_id, FileType type, BufferSlice bytes, string name); + void cancel(QueryId query_id); - void update_local_file_location(QueryId query_id, const LocalFileLocation &local); + void update_downloaded_part(QueryId query_id, int64 offset, int64 limit); void get_content(string file_path, Promise promise); @@ -81,7 +75,6 @@ class FileLoadManager final : public Actor { std::map> download_resource_manager_map_; std::map> download_small_resource_manager_map_; - ActorOwn upload_resource_manager_; Container nodes_container_; unique_ptr callback_; @@ -100,11 +93,7 @@ class FileLoadManager final : public Actor { void on_start_download(); void on_partial_download(PartialLocalFileLocation partial_local, int64 ready_size, int64 size); - void on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size); - void on_hash(string hash); void on_ok_download(FullLocalFileLocation local, int64 size, bool is_new); - void on_ok_upload(FileType file_type, PartialRemoteFileLocation remote, int64 size); - void on_ok_upload_full(FullRemoteFileLocation remote); void on_error(Status status); void on_error_impl(NodeId node_id, Status status); @@ -130,43 +119,6 @@ class FileLoadManager final : public Actor { } }; - class FileUploaderCallback final : public FileUploader::Callback { - public: - explicit FileUploaderCallback(ActorShared actor_id) : actor_id_(std::move(actor_id)) { - } - - private: - ActorShared actor_id_; - - void on_hash(string hash) final { - send_closure(actor_id_, &FileLoadManager::on_hash, std::move(hash)); - } - void on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size) final { - send_closure(actor_id_, &FileLoadManager::on_partial_upload, std::move(partial_remote), ready_size); - } - void on_ok(FileType file_type, PartialRemoteFileLocation partial_remote, int64 size) final { - send_closure(std::move(actor_id_), &FileLoadManager::on_ok_upload, file_type, std::move(partial_remote), size); - } - void on_error(Status status) final { - send_closure(std::move(actor_id_), &FileLoadManager::on_error, std::move(status)); - } - }; - class FileHashUploaderCallback final : public FileHashUploader::Callback { - public: - explicit FileHashUploaderCallback(ActorShared actor_id) : actor_id_(std::move(actor_id)) { - } - - private: - ActorShared actor_id_; - - void on_ok(FullRemoteFileLocation remote) final { - send_closure(std::move(actor_id_), &FileLoadManager::on_ok_upload_full, std::move(remote)); - } - void on_error(Status status) final { - send_closure(std::move(actor_id_), &FileLoadManager::on_error, std::move(status)); - } - }; - class FileFromBytesCallback final : public FileFromBytes::Callback { public: explicit FileFromBytesCallback(ActorShared actor_id) : actor_id_(std::move(actor_id)) { diff --git a/td/telegram/files/FileManager.cpp b/td/telegram/files/FileManager.cpp index a174487e2..d2fd74c80 100644 --- a/td/telegram/files/FileManager.cpp +++ b/td/telegram/files/FileManager.cpp @@ -889,6 +889,9 @@ void FileManager::init_actor() { file_load_manager_ = create_actor_on_scheduler("FileLoadManager", G()->get_slow_net_scheduler_id(), make_unique(actor_id(this)), context_->create_reference()); + file_upload_manager_ = create_actor_on_scheduler( + "FileUploadManager", G()->get_slow_net_scheduler_id(), make_unique(actor_id(this)), + context_->create_reference()); file_generate_manager_ = create_actor_on_scheduler( "FileGenerateManager", G()->get_slow_net_scheduler_id(), context_->create_reference()); } @@ -1608,7 +1611,7 @@ void FileManager::do_cancel_upload(FileNodePtr node) { if (node->upload_id_ == 0) { return; } - send_closure(file_load_manager_, &FileLoadManager::cancel, node->upload_id_); + send_closure(file_upload_manager_, &FileUploadManager::cancel, node->upload_id_); node->upload_id_ = 0; node->upload_was_update_file_reference_ = false; node->set_upload_priority(0); @@ -2564,7 +2567,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) { } VLOG(file_references) << "Receive result from reload photo for file " << file_id << ": " << error; - send_closure(actor_id, &FileManager::on_error, query_id, std::move(error)); + send_closure(actor_id, &FileManager::on_download_error, query_id, std::move(error)); })); node->need_reload_photo_ = false; return; @@ -2577,7 +2580,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) { queries_container_.create(Query{file_id, Query::Type::DownloadWaitFileReference}); node->download_id_ = query_id; if (node->download_was_update_file_reference_) { - return on_error(query_id, Status::Error("Can't download file: have no valid file reference")); + return on_download_error(query_id, Status::Error("Can't download file: have no valid file reference")); } node->download_was_update_file_reference_ = true; @@ -2590,7 +2593,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) { error = res.move_as_error(); } VLOG(file_references) << "Receive result from FileSourceManager for file " << file_id << ": " << error; - send_closure(actor_id, &FileManager::on_error, query_id, std::move(error)); + send_closure(actor_id, &FileManager::on_download_error, query_id, std::move(error)); })); return; } @@ -3008,7 +3011,7 @@ void FileManager::run_generate(FileNodePtr node) { send_closure(actor_, &FileManager::on_generate_ok, query_id_, std::move(local)); } void on_error(Status error) final { - send_closure(actor_, &FileManager::on_error, query_id_, std::move(error)); + send_closure(actor_, &FileManager::on_download_error, query_id_, std::move(error)); } }; return make_unique(file_manager->actor_id(file_manager), query_id); @@ -3084,23 +3087,25 @@ void FileManager::run_upload(FileNodePtr node, vector bad_parts) { if (old_priority != 0) { LOG(INFO) << "File " << file_id << " is already uploading"; CHECK(node->upload_id_ != 0); - send_closure(file_load_manager_, &FileLoadManager::update_priority, node->upload_id_, narrow_cast(-priority)); + send_closure(file_upload_manager_, &FileUploadManager::update_priority, node->upload_id_, + narrow_cast(-priority)); return; } CHECK(node->upload_id_ == 0); if (file_view.has_alive_remote_location() && !file_view.has_active_upload_remote_location() && can_reuse_remote_file(file_view.get_type())) { - FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::UploadWaitFileReference}); + FileUploadManager::QueryId query_id = + queries_container_.create(Query{file_id, Query::Type::UploadWaitFileReference}); node->upload_id_ = query_id; if (node->upload_was_update_file_reference_) { - return on_error(query_id, Status::Error("Can't upload file: have no valid file reference")); + return on_upload_error(query_id, Status::Error("Can't upload file: have no valid file reference")); } node->upload_was_update_file_reference_ = true; context_->repair_file_reference(node->main_file_id_, PromiseCreator::lambda([actor_id = actor_id(this), query_id](Result res) { - send_closure(actor_id, &FileManager::on_error, query_id, + send_closure(actor_id, &FileManager::on_upload_error, query_id, Status::Error("FILE_UPLOAD_RESTART_WITH_FILE_REFERENCE")); })); return; @@ -3108,10 +3113,10 @@ void FileManager::run_upload(FileNodePtr node, vector bad_parts) { if (!node->remote_.partial && node->get_by_hash_) { LOG(INFO) << "Get file " << node->main_file_id_ << " by hash"; - FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::UploadByHash}); + FileUploadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::UploadByHash}); node->upload_id_ = query_id; - send_closure(file_load_manager_, &FileLoadManager::upload_by_hash, query_id, node->local_.full(), node->size_, + send_closure(file_upload_manager_, &FileUploadManager::upload_by_hash, query_id, node->local_.full(), node->size_, narrow_cast(-priority)); return; } @@ -3124,12 +3129,13 @@ void FileManager::run_upload(FileNodePtr node, vector bad_parts) { expected_size = 10 << 20; } - FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Upload}); + FileUploadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Upload}); node->upload_id_ = query_id; - send_closure(file_load_manager_, &FileLoadManager::upload, query_id, node->local_, node->remote_.partial_or_empty(), - expected_size, node->encryption_key_, new_priority, std::move(bad_parts)); + send_closure(file_upload_manager_, &FileUploadManager::upload, query_id, node->local_, + node->remote_.partial_or_empty(), expected_size, node->encryption_key_, new_priority, + std::move(bad_parts)); - LOG(INFO) << "File " << file_id << " upload request has sent to FileLoadManager"; + LOG(INFO) << "File " << file_id << " upload request has sent to FileUploadManager"; } void FileManager::upload(FileId file_id, std::shared_ptr callback, int32 new_priority, @@ -3812,7 +3818,7 @@ void FileManager::on_hash(FileLoadManager::QueryId query_id, string hash) { file_node->encryption_key_.set_value_hash(secure_storage::ValueHash::create(hash).move_as_ok()); } -void FileManager::on_partial_upload(FileLoadManager::QueryId query_id, PartialRemoteFileLocation partial_remote, +void FileManager::on_partial_upload(FileUploadManager::QueryId query_id, PartialRemoteFileLocation partial_remote, int64 ready_size) { if (is_closed_) { return; @@ -3847,7 +3853,7 @@ void FileManager::on_download_ok(FileLoadManager::QueryId query_id, FullLocalFil Query query; bool was_active; - std::tie(query, was_active) = finish_query(query_id); + std::tie(query, was_active) = finish_query(static_cast::Id>(query_id)); auto file_id = query.file_id_; LOG(INFO) << "ON DOWNLOAD OK of " << (is_new ? "new" : "checked") << " file " << file_id << " of size " << size; auto r_new_file_id = register_local(std::move(local), DialogId(), size, false, false, true, file_id); @@ -3865,14 +3871,14 @@ void FileManager::on_download_ok(FileLoadManager::QueryId query_id, FullLocalFil } } -void FileManager::on_upload_ok(FileLoadManager::QueryId query_id, FileType file_type, +void FileManager::on_upload_ok(FileUploadManager::QueryId query_id, FileType file_type, PartialRemoteFileLocation partial_remote, int64 size) { if (is_closed_) { return; } CHECK(partial_remote.ready_part_count_ == partial_remote.part_count_); - auto some_file_id = finish_query(query_id).first.file_id_; + auto some_file_id = finish_query(static_cast::Id>(query_id)).first.file_id_; LOG(INFO) << "ON UPLOAD OK file " << some_file_id << " of size " << size; auto file_node = get_file_node(some_file_id); @@ -3944,12 +3950,12 @@ void FileManager::on_upload_ok(FileLoadManager::QueryId query_id, FileType file_ } // for upload by hash -void FileManager::on_upload_full_ok(FileLoadManager::QueryId query_id, FullRemoteFileLocation remote) { +void FileManager::on_upload_full_ok(FileUploadManager::QueryId query_id, FullRemoteFileLocation remote) { if (is_closed_) { return; } - auto file_id = finish_query(query_id).first.file_id_; + auto file_id = finish_query(static_cast::Id>(query_id)).first.file_id_; LOG(INFO) << "ON UPLOAD FULL OK for file " << file_id; auto new_file_id = register_remote(std::move(remote), FileLocationSource::FromServer, DialogId(), 0, 0, ""); LOG_STATUS(merge(new_file_id, file_id)); @@ -3985,7 +3991,7 @@ void FileManager::on_partial_generate(FileLoadManager::QueryId query_id, Partial run_upload(file_node, {}); } if (file_node->upload_id_ != 0) { - send_closure(file_load_manager_, &FileLoadManager::update_local_file_location, file_node->upload_id_, + send_closure(file_upload_manager_, &FileUploadManager::update_local_file_location, file_node->upload_id_, LocalFileLocation(std::move(partial_local))); } @@ -3999,7 +4005,7 @@ void FileManager::on_generate_ok(FileLoadManager::QueryId query_id, FullLocalFil Query query; bool was_active; - std::tie(query, was_active) = finish_query(query_id); + std::tie(query, was_active) = finish_query(static_cast::Id>(query_id)); auto generate_file_id = query.file_id_; LOG(INFO) << "Receive on_generate_ok for file " << generate_file_id << ": " << local; @@ -4030,20 +4036,36 @@ void FileManager::on_generate_ok(FileLoadManager::QueryId query_id, FullLocalFil if (was_active) { if (old_upload_id != 0 && old_upload_id == file_node->upload_id_) { - send_closure(file_load_manager_, &FileLoadManager::update_local_file_location, file_node->upload_id_, + send_closure(file_upload_manager_, &FileUploadManager::update_local_file_location, file_node->upload_id_, LocalFileLocation(std::move(local))); } } } -void FileManager::on_error(FileLoadManager::QueryId query_id, Status status) { +void FileManager::on_download_error(FileLoadManager::QueryId query_id, Status status) { if (is_closed_) { return; } Query query; bool was_active; - std::tie(query, was_active) = finish_query(query_id); + std::tie(query, was_active) = finish_query(static_cast::Id>(query_id)); + auto node = get_file_node(query.file_id_); + if (!node) { + LOG(ERROR) << "Can't find file node for " << query.file_id_ << " " << status; + return; + } + on_error_impl(node, query.type_, was_active, std::move(status)); +} + +void FileManager::on_upload_error(FileUploadManager::QueryId query_id, Status status) { + if (is_closed_) { + return; + } + + Query query; + bool was_active; + std::tie(query, was_active) = finish_query(static_cast::Id>(query_id)); auto node = get_file_node(query.file_id_); if (!node) { LOG(ERROR) << "Can't find file node for " << query.file_id_ << " " << status; @@ -4204,7 +4226,7 @@ void FileManager::on_error_impl(FileNodePtr node, Query::Type type, bool was_act } } -std::pair FileManager::finish_query(FileLoadManager::QueryId query_id) { +std::pair FileManager::finish_query(Container::Id query_id) { SCOPE_EXIT { queries_container_.erase(query_id); }; @@ -4262,10 +4284,17 @@ void FileManager::hangup() { file_db_.reset(); file_generate_manager_.reset(); file_load_manager_.reset(); + file_upload_manager_.reset(); while (!queries_container_.empty()) { auto query_ids = queries_container_.ids(); for (auto query_id : query_ids) { - on_error(query_id, Global::request_aborted_error()); + Query query; + bool was_active; + std::tie(query, was_active) = finish_query(static_cast::Id>(query_id)); + auto node = get_file_node(query.file_id_); + if (node) { + on_error_impl(node, query.type_, was_active, Global::request_aborted_error()); + } } } is_closed_ = true; diff --git a/td/telegram/files/FileManager.h b/td/telegram/files/FileManager.h index 67fd09475..60b5ce7ed 100644 --- a/td/telegram/files/FileManager.h +++ b/td/telegram/files/FileManager.h @@ -16,6 +16,7 @@ #include "td/telegram/files/FileLocation.h" #include "td/telegram/files/FileSourceId.h" #include "td/telegram/files/FileType.h" +#include "td/telegram/files/FileUploadManager.h" #include "td/telegram/Location.h" #include "td/telegram/PhotoSizeSource.h" #include "td/telegram/td_api.h" @@ -134,7 +135,7 @@ class FileNode { static constexpr char PERSISTENT_ID_VERSION = 4; LocalFileLocation local_; - FileLoadManager::QueryId upload_id_ = 0; + FileUploadManager::QueryId upload_id_ = 0; int64 download_offset_ = 0; int64 private_download_limit_ = 0; int64 local_ready_size_ = 0; // PartialLocal only @@ -576,30 +577,42 @@ class FileManager final : public Actor { send_closure(actor_id_, &FileManager::on_partial_download, query_id, std::move(partial_local), ready_size, size); } - void on_partial_upload(FileLoadManager::QueryId query_id, PartialRemoteFileLocation partial_remote, - int64 ready_size) final { - send_closure(actor_id_, &FileManager::on_partial_upload, query_id, std::move(partial_remote), ready_size); - } - - void on_hash(FileLoadManager::QueryId query_id, string hash) final { - send_closure(actor_id_, &FileManager::on_hash, query_id, std::move(hash)); - } - - void on_upload_ok(FileLoadManager::QueryId query_id, FileType file_type, PartialRemoteFileLocation remote, - int64 size) final { - send_closure(actor_id_, &FileManager::on_upload_ok, query_id, file_type, std::move(remote), size); - } - - void on_upload_full_ok(FileLoadManager::QueryId query_id, FullRemoteFileLocation remote) final { - send_closure(actor_id_, &FileManager::on_upload_full_ok, query_id, std::move(remote)); - } - void on_download_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) final { send_closure(actor_id_, &FileManager::on_download_ok, query_id, std::move(local), size, is_new); } void on_error(FileLoadManager::QueryId query_id, Status status) final { - send_closure(actor_id_, &FileManager::on_error, query_id, std::move(status)); + send_closure(actor_id_, &FileManager::on_download_error, query_id, std::move(status)); + } + }; + class FileUploadManagerCallback final : public FileUploadManager::Callback { + public: + explicit FileUploadManagerCallback(ActorId actor_id) : actor_id_(std::move(actor_id)) { + } + + private: + ActorId actor_id_; + + void on_partial_upload(FileUploadManager::QueryId query_id, PartialRemoteFileLocation partial_remote, + int64 ready_size) final { + send_closure(actor_id_, &FileManager::on_partial_upload, query_id, std::move(partial_remote), ready_size); + } + + void on_hash(FileUploadManager::QueryId query_id, string hash) final { + send_closure(actor_id_, &FileManager::on_hash, query_id, std::move(hash)); + } + + void on_upload_ok(FileUploadManager::QueryId query_id, FileType file_type, PartialRemoteFileLocation remote, + int64 size) final { + send_closure(actor_id_, &FileManager::on_upload_ok, query_id, file_type, std::move(remote), size); + } + + void on_upload_full_ok(FileUploadManager::QueryId query_id, FullRemoteFileLocation remote) final { + send_closure(actor_id_, &FileManager::on_upload_full_ok, query_id, std::move(remote)); + } + + void on_error(FileUploadManager::QueryId query_id, Status status) final { + send_closure(actor_id_, &FileManager::on_upload_error, query_id, std::move(status)); } }; @@ -680,6 +693,7 @@ class FileManager final : public Actor { WaitFreeVector empty_file_ids_; WaitFreeVector> file_nodes_; ActorOwn file_load_manager_; + ActorOwn file_upload_manager_; ActorOwn file_generate_manager_; Container queries_container_; @@ -756,20 +770,23 @@ class FileManager final : public Actor { void on_start_download(FileLoadManager::QueryId query_id); void on_partial_download(FileLoadManager::QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size, int64 size); - void on_hash(FileLoadManager::QueryId query_id, string hash); - void on_partial_upload(FileLoadManager::QueryId query_id, PartialRemoteFileLocation partial_remote, int64 ready_size); void on_download_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new); - void on_upload_ok(FileLoadManager::QueryId query_id, FileType file_type, PartialRemoteFileLocation partial_remote, + void on_download_error(FileLoadManager::QueryId query_id, Status status); + + void on_hash(FileUploadManager::QueryId query_id, string hash); + void on_partial_upload(FileUploadManager::QueryId query_id, PartialRemoteFileLocation partial_remote, + int64 ready_size); + void on_upload_ok(FileUploadManager::QueryId query_id, FileType file_type, PartialRemoteFileLocation partial_remote, int64 size); - void on_upload_full_ok(FileLoadManager::QueryId query_id, FullRemoteFileLocation remote); - void on_error(FileLoadManager::QueryId query_id, Status status); + void on_upload_full_ok(FileUploadManager::QueryId query_id, FullRemoteFileLocation remote); + void on_upload_error(FileUploadManager::QueryId query_id, Status status); void on_error_impl(FileNodePtr node, Query::Type type, bool was_active, Status status); void on_partial_generate(FileLoadManager::QueryId, PartialLocalFileLocation partial_local, int64 expected_size); void on_generate_ok(FileLoadManager::QueryId, FullLocalFileLocation local); - std::pair finish_query(FileLoadManager::QueryId query_id); + std::pair finish_query(Container::Id query_id); FullRemoteFileLocation *get_remote(int32 key); diff --git a/td/telegram/files/FileUploadManager.cpp b/td/telegram/files/FileUploadManager.cpp new file mode 100644 index 000000000..b04bba70c --- /dev/null +++ b/td/telegram/files/FileUploadManager.cpp @@ -0,0 +1,196 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/telegram/files/FileUploadManager.h" + +#include "td/telegram/Global.h" + +#include "td/utils/common.h" +#include "td/utils/SliceBuilder.h" + +namespace td { + +FileUploadManager::Callback::~Callback() = default; + +FileUploadManager::FileUploadManager(unique_ptr callback, ActorShared<> parent) + : callback_(std::move(callback)), parent_(std::move(parent)) { +} + +void FileUploadManager::start_up() { + constexpr int64 MAX_UPLOAD_RESOURCE_LIMIT = 4 << 20; + upload_resource_manager_ = create_actor( + "UploadResourceManager", MAX_UPLOAD_RESOURCE_LIMIT, + !G()->keep_media_order() ? ResourceManager::Mode::Greedy : ResourceManager::Mode::Baseline); +} + +void FileUploadManager::upload(QueryId query_id, const LocalFileLocation &local_location, + const RemoteFileLocation &remote_location, int64 expected_size, + const FileEncryptionKey &encryption_key, int8 priority, vector bad_parts) { + if (stop_flag_) { + return; + } + NodeId node_id = nodes_container_.create(Node()); + Node *node = nodes_container_.get(node_id); + CHECK(node); + node->query_id_ = query_id; + auto callback = make_unique(actor_shared(this, node_id)); + node->loader_ = create_actor("Uploader", local_location, remote_location, expected_size, encryption_key, + std::move(bad_parts), std::move(callback)); + send_closure(upload_resource_manager_, &ResourceManager::register_worker, + ActorShared(node->loader_.get(), static_cast(-1)), priority); + bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second; + CHECK(is_inserted); +} + +void FileUploadManager::upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size, + int8 priority) { + if (stop_flag_) { + return; + } + NodeId node_id = nodes_container_.create(Node()); + Node *node = nodes_container_.get(node_id); + CHECK(node); + node->query_id_ = query_id; + auto callback = make_unique(actor_shared(this, node_id)); + node->loader_ = create_actor("HashUploader", local_location, size, std::move(callback)); + send_closure(upload_resource_manager_, &ResourceManager::register_worker, + ActorShared(node->loader_.get(), static_cast(-1)), priority); + bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second; + CHECK(is_inserted); +} + +void FileUploadManager::update_priority(QueryId query_id, int8 priority) { + if (stop_flag_) { + return; + } + auto it = query_id_to_node_id_.find(query_id); + if (it == query_id_to_node_id_.end()) { + return; + } + auto node = nodes_container_.get(it->second); + if (node == nullptr) { + return; + } + send_closure(node->loader_, &FileLoaderActor::update_priority, priority); +} + +void FileUploadManager::cancel(QueryId query_id) { + if (stop_flag_) { + return; + } + auto it = query_id_to_node_id_.find(query_id); + if (it == query_id_to_node_id_.end()) { + return; + } + on_error_impl(it->second, Status::Error(-1, "Canceled")); +} + +void FileUploadManager::update_local_file_location(QueryId query_id, const LocalFileLocation &local) { + if (stop_flag_) { + return; + } + auto it = query_id_to_node_id_.find(query_id); + if (it == query_id_to_node_id_.end()) { + return; + } + auto node = nodes_container_.get(it->second); + if (node == nullptr) { + return; + } + send_closure(node->loader_, &FileLoaderActor::update_local_file_location, local); +} + +void FileUploadManager::hangup() { + nodes_container_.for_each([](auto query_id, auto &node) { node.loader_.reset(); }); + stop_flag_ = true; + loop(); +} + +void FileUploadManager::on_hash(string hash) { + auto node_id = get_link_token(); + auto node = nodes_container_.get(node_id); + if (node == nullptr) { + return; + } + if (!stop_flag_) { + callback_->on_hash(node->query_id_, std::move(hash)); + } +} + +void FileUploadManager::on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size) { + auto node_id = get_link_token(); + auto node = nodes_container_.get(node_id); + if (node == nullptr) { + return; + } + if (!stop_flag_) { + callback_->on_partial_upload(node->query_id_, std::move(partial_remote), ready_size); + } +} + +void FileUploadManager::on_ok_upload(FileType file_type, PartialRemoteFileLocation remote, int64 size) { + auto node_id = get_link_token(); + auto node = nodes_container_.get(node_id); + if (node == nullptr) { + return; + } + if (!stop_flag_) { + callback_->on_upload_ok(node->query_id_, file_type, std::move(remote), size); + } + close_node(node_id); + loop(); +} + +void FileUploadManager::on_ok_upload_full(FullRemoteFileLocation remote) { + auto node_id = get_link_token(); + auto node = nodes_container_.get(node_id); + if (node == nullptr) { + return; + } + if (!stop_flag_) { + callback_->on_upload_full_ok(node->query_id_, std::move(remote)); + } + close_node(node_id); + loop(); +} + +void FileUploadManager::on_error(Status status) { + auto node_id = get_link_token(); + on_error_impl(node_id, std::move(status)); +} + +void FileUploadManager::on_error_impl(NodeId node_id, Status status) { + auto node = nodes_container_.get(node_id); + if (node == nullptr) { + status.ignore(); + return; + } + if (!stop_flag_) { + callback_->on_error(node->query_id_, std::move(status)); + } + close_node(node_id); + loop(); +} + +void FileUploadManager::hangup_shared() { + auto node_id = get_link_token(); + on_error_impl(node_id, Status::Error(-1, "Canceled")); +} + +void FileUploadManager::loop() { + if (stop_flag_ && nodes_container_.empty()) { + stop(); + } +} + +void FileUploadManager::close_node(NodeId node_id) { + auto node = nodes_container_.get(node_id); + CHECK(node); + query_id_to_node_id_.erase(node->query_id_); + nodes_container_.erase(node_id); +} + +} // namespace td diff --git a/td/telegram/files/FileUploadManager.h b/td/telegram/files/FileUploadManager.h new file mode 100644 index 000000000..3caa74f98 --- /dev/null +++ b/td/telegram/files/FileUploadManager.h @@ -0,0 +1,121 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/telegram/files/FileEncryptionKey.h" +#include "td/telegram/files/FileHashUploader.h" +#include "td/telegram/files/FileLocation.h" +#include "td/telegram/files/FileType.h" +#include "td/telegram/files/FileUploader.h" +#include "td/telegram/files/ResourceManager.h" + +#include "td/actor/actor.h" + +#include "td/utils/common.h" +#include "td/utils/Container.h" +#include "td/utils/Status.h" + +#include + +namespace td { + +class FileUploadManager final : public Actor { + public: + using QueryId = uint64; + class Callback { + public: + virtual ~Callback(); + virtual void on_partial_upload(QueryId query_id, PartialRemoteFileLocation partial_remote, int64 ready_size) = 0; + virtual void on_hash(QueryId query_id, string hash) = 0; + virtual void on_upload_ok(QueryId query_id, FileType file_type, PartialRemoteFileLocation remote, int64 size) = 0; + virtual void on_upload_full_ok(QueryId query_id, FullRemoteFileLocation remote) = 0; + virtual void on_error(QueryId query_id, Status status) = 0; + }; + + explicit FileUploadManager(unique_ptr callback, ActorShared<> parent); + + void upload(QueryId query_id, const LocalFileLocation &local_location, const RemoteFileLocation &remote_location, + int64 expected_size, const FileEncryptionKey &encryption_key, int8 priority, vector bad_parts); + + void upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size, int8 priority); + + void update_priority(QueryId query_id, int8 priority); + + void cancel(QueryId query_id); + + void update_local_file_location(QueryId query_id, const LocalFileLocation &local); + + private: + struct Node { + QueryId query_id_; + ActorOwn loader_; + ResourceState resource_state_; + }; + using NodeId = uint64; + + ActorOwn upload_resource_manager_; + + Container nodes_container_; + unique_ptr callback_; + ActorShared<> parent_; + std::map query_id_to_node_id_; + bool stop_flag_ = false; + + void start_up() final; + void loop() final; + void hangup() final; + void hangup_shared() final; + + void close_node(NodeId node_id); + + void on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size); + void on_hash(string hash); + void on_ok_upload(FileType file_type, PartialRemoteFileLocation remote, int64 size); + void on_ok_upload_full(FullRemoteFileLocation remote); + void on_error(Status status); + void on_error_impl(NodeId node_id, Status status); + + class FileUploaderCallback final : public FileUploader::Callback { + public: + explicit FileUploaderCallback(ActorShared actor_id) : actor_id_(std::move(actor_id)) { + } + + private: + ActorShared actor_id_; + + void on_hash(string hash) final { + send_closure(actor_id_, &FileUploadManager::on_hash, std::move(hash)); + } + void on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size) final { + send_closure(actor_id_, &FileUploadManager::on_partial_upload, std::move(partial_remote), ready_size); + } + void on_ok(FileType file_type, PartialRemoteFileLocation partial_remote, int64 size) final { + send_closure(std::move(actor_id_), &FileUploadManager::on_ok_upload, file_type, std::move(partial_remote), size); + } + void on_error(Status status) final { + send_closure(std::move(actor_id_), &FileUploadManager::on_error, std::move(status)); + } + }; + + class FileHashUploaderCallback final : public FileHashUploader::Callback { + public: + explicit FileHashUploaderCallback(ActorShared actor_id) : actor_id_(std::move(actor_id)) { + } + + private: + ActorShared actor_id_; + + void on_ok(FullRemoteFileLocation remote) final { + send_closure(std::move(actor_id_), &FileUploadManager::on_ok_upload_full, std::move(remote)); + } + void on_error(Status status) final { + send_closure(std::move(actor_id_), &FileUploadManager::on_error, std::move(status)); + } + }; +}; + +} // namespace td