From bb0eb350277228761f0b5da510774d120810bea5 Mon Sep 17 00:00:00 2001 From: levlam Date: Sat, 13 Jul 2024 00:59:24 +0300 Subject: [PATCH] Inline FileLoader to FileUploader. --- td/telegram/files/FileLoader.cpp | 18 -- td/telegram/files/FileLoader.h | 12 +- td/telegram/files/FileUploader.cpp | 266 +++++++++++++++++++++++------ td/telegram/files/FileUploader.h | 79 ++++++--- 4 files changed, 276 insertions(+), 99 deletions(-) diff --git a/td/telegram/files/FileLoader.cpp b/td/telegram/files/FileLoader.cpp index 4ace79f26..9458c1b60 100644 --- a/td/telegram/files/FileLoader.cpp +++ b/td/telegram/files/FileLoader.cpp @@ -11,7 +11,6 @@ #include "td/telegram/net/NetQueryDispatcher.h" #include "td/telegram/UniqueId.h" -#include "td/utils/common.h" #include "td/utils/format.h" #include "td/utils/logging.h" #include "td/utils/misc.h" @@ -52,23 +51,6 @@ void FileLoader::hangup_shared() { } } -void FileLoader::update_local_file_location(const LocalFileLocation &local) { - auto r_prefix_info = on_update_local_location(local, parts_manager_.get_size_or_zero()); - if (r_prefix_info.is_error()) { - on_error(r_prefix_info.move_as_error()); - stop_flag_ = true; - return; - } - auto prefix_info = r_prefix_info.move_as_ok(); - auto status = parts_manager_.set_known_prefix(prefix_info.size, prefix_info.is_ready); - if (status.is_error()) { - on_error(std::move(status)); - stop_flag_ = true; - return; - } - loop(); -} - void FileLoader::update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) { if (parts_manager_.get_streaming_offset() != offset) { auto begin_part_id = parts_manager_.set_streaming_offset(offset, limit); diff --git a/td/telegram/files/FileLoader.h b/td/telegram/files/FileLoader.h index 4393c7096..2fd4ec152 100644 --- a/td/telegram/files/FileLoader.h +++ b/td/telegram/files/FileLoader.h @@ -31,17 +31,14 @@ class FileLoader : public FileLoaderActor { void update_priority(int8 priority) final; void update_resources(const ResourceState &other) final; - void update_local_file_location(const LocalFileLocation &local) final; + void update_local_file_location(const LocalFileLocation &local) final { + } void update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) final; protected: void set_ordered_flag(bool flag); size_t get_part_size() const; - struct PrefixInfo { - int64 size = -1; - bool is_ready = false; - }; struct FileInfo { int64 size{0}; int64 expected_size{0}; @@ -75,10 +72,7 @@ class FileLoader : public FileLoaderActor { int64 size{0}; }; virtual void on_progress(Progress progress) = 0; - virtual Result on_update_local_location(const LocalFileLocation &location, - int64 file_size) TD_WARN_UNUSED_RESULT { - return Status::Error("Unsupported"); - } + virtual Result should_restart_part(Part part, const NetQueryPtr &net_query) TD_WARN_UNUSED_RESULT { return false; } diff --git a/td/telegram/files/FileUploader.cpp b/td/telegram/files/FileUploader.cpp index 27eeab4c5..73262b370 100644 --- a/td/telegram/files/FileUploader.cpp +++ b/td/telegram/files/FileUploader.cpp @@ -12,6 +12,7 @@ #include "td/telegram/net/NetQueryDispatcher.h" #include "td/telegram/SecureStorage.h" #include "td/telegram/telegram_api.h" +#include "td/telegram/UniqueId.h" #include "td/utils/buffer.h" #include "td/utils/crypto.h" @@ -43,15 +44,21 @@ FileUploader::FileUploader(const LocalFileLocation &local, const RemoteFileLocat } } -Result FileUploader::init() { +void FileUploader::start_up() { if (remote_.type() == RemoteFileLocation::Type::Full) { - return Status::Error("File is already uploaded"); + on_error(Status::Error("File is already uploaded")); + stop_flag_ = true; + return; } // file_size is needed only for partial local locations, but for uploaded partial files // size is yet unknown or local location is full, so we can always pass 0 here - TRY_RESULT(prefix_info, on_update_local_location(local_, 0)); - (void)prefix_info; + auto r_prefix_info = on_update_local_location(local_, 0); + if (r_prefix_info.is_error()) { + on_error(r_prefix_info.move_as_error()); + stop_flag_ = true; + return; + } int offset = 0; int part_size = 0; @@ -66,20 +73,20 @@ Result FileUploader::init() { big_flag_ = is_file_big(file_type_, expected_size_); } - std::vector ok(offset, true); + vector ok(offset, true); for (auto bad_id : bad_parts_) { if (bad_id >= 0 && bad_id < offset) { ok[bad_id] = false; } } - std::vector parts; + vector ready_parts; for (int i = 0; i < offset; i++) { if (ok[i]) { - parts.push_back(i); + ready_parts.push_back(i); } } if (!ok.empty() && !ok[0]) { - parts.clear(); + ready_parts.clear(); part_size = 0; remote_ = RemoteFileLocation(); file_id_ = Random::secure_int64(); @@ -87,18 +94,36 @@ Result FileUploader::init() { } LOG(DEBUG) << "Init file uploader for " << remote_ << " with offset = " << offset << " and part size = " << part_size; - FileInfo res; - res.size = local_size_; - res.expected_size = expected_size_; - res.is_size_final = local_is_ready_; - res.part_size = part_size; - res.ready_parts = std::move(parts); - res.is_upload = true; - return res; + + auto expected_size = max(local_size_, expected_size_); + + // Two cases when FILE_UPLOAD_RESTART will happen + // 1. File is ready, size is final. But there are more uploaded parts than size of the file + // pm.init(1, 100000, true, 10, {0, 1, 2}, false, true).ensure_error(); + // This error is definitely ok, because we are using actual size of the file on disk (mtime is checked by + // somebody else). And actual size could change arbitrarily. + // + // 2. File size is not final, and some parts ending after known file size were uploaded + // pm.init(0, 100000, false, 10, {0, 1, 2}, false, true).ensure_error(); + // This can happen only if file state became inconsistent at some point. For example, local location was deleted, + // but partial remote location was kept. This is possible, but probably should be fixed. + auto status = parts_manager_.init(local_size_, expected_size, local_is_ready_, part_size, ready_parts, true, true); + LOG(DEBUG) << "Start uploading a file of size " << local_size_ << " with expected " + << (local_is_ready_ ? "exact" : "approximate") << " size " << expected_size << ", part size " << part_size + << " and " << ready_parts.size() << " ready parts: " << status; + if (status.is_error()) { + on_error(std::move(status)); + stop_flag_ = true; + return; + } + resource_state_.set_unit_size(parts_manager_.get_part_size()); + update_estimated_limit(); + on_progress(); + yield(); } -Result FileUploader::on_update_local_location(const LocalFileLocation &location, - int64 file_size) { +Result FileUploader::on_update_local_location(const LocalFileLocation &location, + int64 file_size) { SCOPE_EXIT { try_release_fd(); }; @@ -195,15 +220,6 @@ Result FileUploader::on_update_local_location(const Loca return info; } -Status FileUploader::on_ok(int64 size) { - fd_.close(); - if (is_temp_) { - LOG(INFO) << "UNLINK " << fd_path_; - unlink(fd_path_).ignore(); - } - return Status::OK(); -} - void FileUploader::on_error(Status status) { fd_.close(); if (is_temp_) { @@ -215,7 +231,7 @@ void FileUploader::on_error(Status status) { Status FileUploader::generate_iv_map() { LOG(INFO) << "Generate iv_map " << generate_offset_ << " " << local_size_; - auto part_size = get_part_size(); + auto part_size = parts_manager_.get_part_size(); auto encryption_key = FileEncryptionKey(encryption_key_.key_slice(), generate_iv_); BufferSlice bytes(part_size); if (iv_map_.empty()) { @@ -236,19 +252,7 @@ Status FileUploader::generate_iv_map() { return Status::OK(); } -Status FileUploader::before_start_parts() { - auto status = acquire_fd(); - if (status.is_error() && !local_is_ready_) { - return Status::Error(-1, "Can't open temporary file"); - } - return status; -} - -void FileUploader::after_start_parts() { - try_release_fd(); -} - -Result FileUploader::start_part(Part part, int32 part_count, int64 streaming_offset) { +Result FileUploader::start_part(Part part, int32 part_count) { auto padded_size = part.size; if (encryption_key_.is_secret()) { padded_size = (padded_size + 15) & ~15; @@ -307,23 +311,19 @@ Result FileUploader::process_part(Part part, NetQueryPtr net_query) { return part.size; } -void FileUploader::on_progress(Progress progress) { - callback_->on_partial_upload(PartialRemoteFileLocation{file_id_, progress.part_count, progress.part_size, - progress.ready_part_count, big_flag_}, - progress.ready_size); - if (progress.is_ready) { +void FileUploader::on_progress() { + auto part_count = parts_manager_.get_part_count(); + auto part_size = static_cast(parts_manager_.get_part_size()); + auto ready_part_count = parts_manager_.get_ready_prefix_count(); + callback_->on_partial_upload(PartialRemoteFileLocation{file_id_, part_count, part_size, ready_part_count, big_flag_}, + parts_manager_.get_ready_size()); + if (parts_manager_.ready()) { callback_->on_ok(file_type_, - PartialRemoteFileLocation{file_id_, progress.part_count, progress.part_size, - progress.ready_part_count, big_flag_}, + PartialRemoteFileLocation{file_id_, part_count, part_size, ready_part_count, big_flag_}, local_size_); } } -void FileUploader::keep_fd_flag(bool keep_fd) { - keep_fd_ = keep_fd; - try_release_fd(); -} - void FileUploader::try_release_fd() { if (!keep_fd_ && !fd_.empty()) { fd_.close(); @@ -337,4 +337,164 @@ Status FileUploader::acquire_fd() { return Status::OK(); } +void FileUploader::set_resource_manager(ActorShared resource_manager) { + resource_manager_ = std::move(resource_manager); + send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_); +} + +void FileUploader::update_priority(int8 priority) { + send_closure(resource_manager_, &ResourceManager::update_priority, priority); +} + +void FileUploader::update_resources(const ResourceState &other) { + resource_state_.update_slave(other); + VLOG(file_loader) << "Update resources " << resource_state_; + loop(); +} + +void FileUploader::update_local_file_location(const LocalFileLocation &local) { + auto r_prefix_info = on_update_local_location(local, parts_manager_.get_size_or_zero()); + if (r_prefix_info.is_error()) { + on_error(r_prefix_info.move_as_error()); + stop_flag_ = true; + return; + } + auto prefix_info = r_prefix_info.move_as_ok(); + auto status = parts_manager_.set_known_prefix(prefix_info.size, prefix_info.is_ready); + if (status.is_error()) { + on_error(std::move(status)); + stop_flag_ = true; + return; + } + loop(); +} + +void FileUploader::loop() { + if (stop_flag_) { + return; + } + auto status = do_loop(); + if (status.is_error()) { + if (status.code() == -1) { + return; + } + on_error(std::move(status)); + stop_flag_ = true; + return; + } +} + +Status FileUploader::do_loop() { + if (parts_manager_.may_finish()) { + TRY_STATUS(parts_manager_.finish()); + fd_.close(); + if (is_temp_) { + LOG(INFO) << "UNLINK " << fd_path_; + unlink(fd_path_).ignore(); + } + stop_flag_ = true; + return Status::OK(); + } + + auto status = acquire_fd(); + if (status.is_error()) { + if (!local_is_ready_) { + return Status::Error(-1, "Can't open temporary file"); + } + return status; + } + + SCOPE_EXIT { + try_release_fd(); + }; + while (true) { + if (resource_state_.unused() < narrow_cast(parts_manager_.get_part_size())) { + VLOG(file_loader) << "Receive only " << resource_state_.unused() << " resource"; + break; + } + TRY_RESULT(part, parts_manager_.start_part()); + if (part.size == 0) { + break; + } + VLOG(file_loader) << "Start part " << tag("id", part.id) << tag("size", part.size); + resource_state_.start_use(static_cast(part.size)); + + TRY_RESULT(query, start_part(part, parts_manager_.get_part_count())); + uint64 unique_id = UniqueId::next(); + part_map_[unique_id] = std::make_pair(part, query->cancel_slot_.get_signal_new()); + + G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, unique_id)); + } + return Status::OK(); +} + +void FileUploader::tear_down() { + for (auto &it : part_map_) { + it.second.second.reset(); // cancel_query(it.second.second); + } +} + +void FileUploader::update_estimated_limit() { + if (stop_flag_) { + return; + } + auto estimated_extra = parts_manager_.get_estimated_extra(); + resource_state_.update_estimated_limit(estimated_extra); + VLOG(file_loader) << "Update estimated limit " << estimated_extra; + if (!resource_manager_.empty()) { + keep_fd_ = narrow_cast(resource_state_.active_limit()) >= parts_manager_.get_part_size(); + try_release_fd(); + send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_); + } +} + +void FileUploader::on_result(NetQueryPtr query) { + if (stop_flag_) { + return; + } + auto unique_id = get_link_token(); + auto it = part_map_.find(unique_id); + if (it == part_map_.end()) { + LOG(ERROR) << "Receive result for unknown part"; + return; + } + + Part part = it->second.first; + it->second.second.release(); + CHECK(query->is_ready()); + part_map_.erase(it); + + bool should_restart = query->is_error() && query->error().code() == NetQuery::Error::Canceled; + if (should_restart) { + VLOG(file_loader) << "Restart part " << tag("id", part.id) << tag("size", part.size); + resource_state_.stop_use(static_cast(part.size)); + parts_manager_.on_part_failed(part.id); + } else { + on_part_query(part, std::move(query)); + } + update_estimated_limit(); + loop(); +} + +void FileUploader::on_part_query(Part part, NetQueryPtr query) { + if (stop_flag_) { + // important for secret files + return; + } + auto status = try_on_part_query(part, std::move(query)); + if (status.is_error()) { + on_error(std::move(status)); + stop_flag_ = true; + } +} + +Status FileUploader::try_on_part_query(Part part, NetQueryPtr query) { + TRY_RESULT(size, process_part(part, std::move(query))); + VLOG(file_loader) << "Ok part " << tag("id", part.id) << tag("size", part.size); + resource_state_.stop_use(static_cast(part.size)); + TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size)); + on_progress(); + return Status::OK(); +} + } // namespace td diff --git a/td/telegram/files/FileUploader.h b/td/telegram/files/FileUploader.h index 1de0826dc..6385a216c 100644 --- a/td/telegram/files/FileUploader.h +++ b/td/telegram/files/FileUploader.h @@ -7,20 +7,25 @@ #pragma once #include "td/telegram/files/FileEncryptionKey.h" -#include "td/telegram/files/FileLoader.h" +#include "td/telegram/files/FileLoaderActor.h" #include "td/telegram/files/FileLocation.h" #include "td/telegram/files/FileType.h" +#include "td/telegram/files/PartsManager.h" +#include "td/telegram/files/ResourceManager.h" +#include "td/telegram/files/ResourceState.h" +#include "td/telegram/net/NetQuery.h" #include "td/utils/common.h" #include "td/utils/port/FileFd.h" #include "td/utils/Status.h" #include "td/utils/UInt.h" +#include #include namespace td { -class FileUploader final : public FileLoader { +class FileUploader final : public FileLoaderActor { public: class Callback { public: @@ -37,20 +42,29 @@ class FileUploader final : public FileLoader { FileUploader(const LocalFileLocation &local, const RemoteFileLocation &remote, int64 expected_size, const FileEncryptionKey &encryption_key, std::vector bad_parts, unique_ptr callback); - // Should just implement all parent pure virtual methods. - // Must not call any of them... + void set_resource_manager(ActorShared resource_manager) final; + + void update_priority(int8 priority) final; + + void update_resources(const ResourceState &other) final; + + void update_local_file_location(const LocalFileLocation &local) final; + + void update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) { + } + private: LocalFileLocation local_; RemoteFileLocation remote_; int64 expected_size_; FileEncryptionKey encryption_key_; - std::vector bad_parts_; + vector bad_parts_; unique_ptr callback_; int64 local_size_ = 0; bool local_is_ready_ = false; FileType file_type_ = FileType::Temp; - std::vector iv_map_; + vector iv_map_; UInt256 iv_; string generate_iv_; int64 generate_offset_ = 0; @@ -58,27 +72,54 @@ class FileUploader final : public FileLoader { FileFd fd_; string fd_path_; - bool is_temp_ = false; int64 file_id_ = 0; + bool is_temp_ = false; bool big_flag_ = false; + bool keep_fd_ = false; + bool stop_flag_ = false; - Result init() final TD_WARN_UNUSED_RESULT; - Status on_ok(int64 size) final TD_WARN_UNUSED_RESULT; - void on_error(Status status) final; - Status before_start_parts() final; - void after_start_parts() final; - Result start_part(Part part, int32 part_count, int64 streaming_offset) final TD_WARN_UNUSED_RESULT; - Result process_part(Part part, NetQueryPtr net_query) final TD_WARN_UNUSED_RESULT; - void on_progress(Progress progress) final; - Result on_update_local_location(const LocalFileLocation &location, - int64 file_size) final TD_WARN_UNUSED_RESULT; + ActorShared resource_manager_; + ResourceState resource_state_; + PartsManager parts_manager_; + std::map>> part_map_; + + void on_error(Status status); + + Result start_part(Part part, int32 part_count) TD_WARN_UNUSED_RESULT; + + Result process_part(Part part, NetQueryPtr net_query) TD_WARN_UNUSED_RESULT; + + void on_progress(); + + struct PrefixInfo { + int64 size = -1; + bool is_ready = false; + }; + Result on_update_local_location(const LocalFileLocation &location, int64 file_size) TD_WARN_UNUSED_RESULT; Status generate_iv_map(); - bool keep_fd_ = false; - void keep_fd_flag(bool keep_fd) final; void try_release_fd(); + Status acquire_fd() TD_WARN_UNUSED_RESULT; + + void start_up() final; + + void loop() final; + + Status do_loop(); + + void tear_down() final; + + void update_estimated_limit(); + + void on_result(NetQueryPtr query) final; + + void on_part_query(Part part, NetQueryPtr query); + + void on_common_query(NetQueryPtr query); + + Status try_on_part_query(Part part, NetQueryPtr query); }; } // namespace td