diff --git a/td/telegram/FileReferenceManager.cpp b/td/telegram/FileReferenceManager.cpp index 6d1ee7089..7cfcc7862 100644 --- a/td/telegram/FileReferenceManager.cpp +++ b/td/telegram/FileReferenceManager.cpp @@ -46,62 +46,185 @@ FileSourceId FileReferenceManager::create_message_file_source(FullMessageId full return source_id; } -void FileReferenceManager::update_file_reference(FileId file_id, vector file_source_ids, - Promise<> promise) { - VLOG(file_references) << "Trying to load valid file_reference from server: " << file_id << " " << file_source_ids; - MultiPromiseActorSafe mpas{"UpdateFileReferenceMultiPromiseActor"}; - mpas.set_ignore_errors(true); - mpas.add_promise(std::move(promise)); - auto lock = mpas.get_promise(); - for (auto source_id : file_source_ids) { - auto index = static_cast(source_id.get()) - 1; - CHECK(index < file_sources_.size()); +void FileReferenceManager::add_file_source(NodeId node_id, FileSourceId file_source_id) { + VLOG(file_references) << "add_file_source: " << node_id << " " << file_source_id; + nodes_[node_id].file_source_ids.add(file_source_id); +} +void FileReferenceManager::remove_file_source(NodeId node_id, FileSourceId file_source_id) { + VLOG(file_references) << "remove_file_source: " << node_id << " " << file_source_id; + nodes_[node_id].file_source_ids.remove(file_source_id); +} - auto new_promise = PromiseCreator::lambda([promise = mpas.get_promise(), file_id, source_id, - file_manager = G()->file_manager()](Result result) mutable { - if (result.is_error() && result.error().code() != 429 && result.error().code() < 500 && !G()->close_flag()) { - VLOG(file_references) << "Invalid source id " << source_id << " " << result.error(); - send_closure(file_manager, &FileManager::remove_file_source, file_id, source_id); +void merge(std::vector> &a, std::vector> &b) { + if (a.size() < b.size()) { + std::swap(a, b); + } + for (auto &x : b) { + a.push_back(std::move(x)); + } +} + +void FileReferenceManager::merge(NodeId to_node_id, NodeId from_node_id) { + VLOG(file_references) << "merge: " << to_node_id << " " << from_node_id; + auto &to = nodes_[to_node_id]; + auto &from = nodes_[from_node_id]; + CHECK(!to.query || to.query->proxy.empty()); + CHECK(!from.query || from.query->proxy.empty()); + if (to.query || from.query) { + if (!to.query) { + to.query = make_unique(); + to.query->generation = ++query_generation; + } + if (from.query) { + ::td::merge(to.query->promises, from.query->promises); + to.query->active_queries += from.query->active_queries; + from.query->proxy = {to_node_id, to.query->generation}; + } + } + to.file_source_ids.merge(std::move(from.file_source_ids)); + run_node(to_node_id); + run_node(from_node_id); +} + +void FileReferenceManager::run_node(NodeId node_id) { + VLOG(file_references) << "run_node: " << node_id; + Node &node = nodes_[node_id]; + if (!node.query) { + return; + } + if (node.query->active_queries != 0) { + return; + } + if (node.query->promises.empty()) { + node.query = {}; + return; + } + if (!node.file_source_ids.has_next()) { + for (auto &p : node.query->promises) { + p.set_value(Unit()); + } + node.query = {}; + return; + } + auto file_source_id = node.file_source_ids.next(); + send_query({node_id, node.query->generation}, file_source_id); +} + +void FileReferenceManager::send_query(Destination dest, FileSourceId file_source_id) { + VLOG(file_references) << "send_query " << dest.node_id << " " << dest.generation << " " << file_source_id; + auto &node = nodes_[dest.node_id]; + node.query->active_queries++; + + auto promise = PromiseCreator::lambda([dest, file_source_id, file_reference_manager = G()->file_reference_manager(), + file_manager = G()->file_manager()](Result result) mutable { + auto new_promise = + PromiseCreator::lambda([dest, file_source_id, file_reference_manager](Result result) mutable { + Status status; + if (result.is_error()) { + status = result.move_as_error(); + } + send_closure(file_reference_manager, &FileReferenceManager::on_query_result, dest, file_source_id, + std::move(status), 0); + }); + if (result.is_error()) { + new_promise.set_result(std::move(result)); + } + send_lambda(file_manager, [file_manager, dest, new_promise = std::move(new_promise)]() mutable { + auto view = file_manager.get_actor_unsafe()->get_file_view(dest.node_id); + if (view.has_active_remote_location()) { + new_promise.set_value({}); + } else { + new_promise.set_error(Status::Error("No active remote location")); } - // NB: main promise must send closure to FileManager - // So the closure will be executed only after the bad source id is removed - promise.set_value(Unit()); }); - file_sources_[index].visit(overloaded( - [&](const FileSourceMessage &source) { - send_closure_later(G()->messages_manager(), &MessagesManager::get_messages_from_server, - vector{source.full_message_id}, std::move(new_promise), nullptr); - }, - [&](const FileSourceUserPhoto &source) { + }); + auto index = static_cast(file_source_id.get()) - 1; + CHECK(index < file_sources_.size()); + file_sources_[index].visit(overloaded( + [&](const FileSourceMessage &source) { + send_closure_later(G()->messages_manager(), &MessagesManager::get_messages_from_server, + vector{source.full_message_id}, std::move(promise), nullptr); + }, + [&](const FileSourceUserPhoto &source) { // send_closure_later(G()->contacts_manager(), &ContactsManager::get_user_photo_from_server, source.user_id, - // source.photo_id, std::move(new_promise)); - }, - [&](const FileSourceChatPhoto &source) { + // source.photo_id, std::move(promise)); + }, + [&](const FileSourceChatPhoto &source) { // send_closure_later(G()->contacts_manager(), &ContactsManager::get_chat_photo_from_server, source.chat_id, - // std::move(new_promise)); - }, - [&](const FileSourceChannelPhoto &source) { + // std::move(promise)); + }, + [&](const FileSourceChannelPhoto &source) { // send_closure_later(G()->contacts_manager(), &ContactsManager::get_channel_photo_from_server, - // source.channel_id, std::move(new_promise)); - }, - [&](const FileSourceWallpapers &source) { + // source.channel_id, std::move(promise)); + }, + [&](const FileSourceWallpapers &source) { // send_closure_later(G()->wallpaper_manager(), &WallpaperManager::get_wallpapers_from_server, - // std::move(new_promise)); - }, - [&](const FileSourceWebPage &source) { - send_closure_later(G()->web_pages_manager(), &WebPagesManager::reload_web_page_by_url, source.url, - std::move(new_promise)); - }, - [&](const FileSourceSavedAnimations &source) { - /* + // std::move(promise)); + }, + [&](const FileSourceWebPage &source) { + send_closure_later(G()->web_pages_manager(), &WebPagesManager::reload_web_page_by_url, source.url, + std::move(promise)); + }, + [&](const FileSourceSavedAnimations &source) { + /* // TODO this is wrong, because we shouldn't pass animations hash to the call // we also sometimes need to do two simultaneous calls one with and one without hash send_closure_later(G()->animations_manager(), &AnimationsManager::reload_saved_animations, - true, std::move(new_promise)); + true, std::move(promise)); */ - })); - } - lock.set_value(Unit()); + })); } +FileReferenceManager::Destination FileReferenceManager::on_query_result(Destination dest, FileSourceId file_source_id, + Status status, int32 sub) { + VLOG(file_references) << "on_query_result " << dest.node_id << " " << dest.generation << " " << file_source_id << " " + << status << " " << sub; + auto &node = nodes_[dest.node_id]; + + auto query = node.query.get(); + if (!query) { + return {}; + } + if (query->generation != dest.generation) { + return {}; + } + query->active_queries--; + + if (!query->proxy.empty()) { + query->active_queries -= sub; + auto new_proxy = on_query_result(query->proxy, file_source_id, std::move(status), query->active_queries); + if (!new_proxy.empty()) { + query->proxy = new_proxy; + } + run_node(dest.node_id); + return new_proxy; + } + + if (status.is_ok()) { + for (auto &p : query->promises) { + p.set_value(Unit()); + } + node.query = {}; + } + if (status.is_error() && status.error().code() != 429 && status.error().code() < 500 && !G()->close_flag()) { + VLOG(file_references) << "Invalid source id " << file_source_id << " " << status; + remove_file_source(dest.node_id, file_source_id); + } + + run_node(dest.node_id); + return dest; +} + +void FileReferenceManager::update_file_reference(NodeId node_id, Promise<> promise) { + VLOG(file_references) << "update_file_reference " << node_id; + auto &node = nodes_[node_id]; + if (!node.query) { + node.query = make_unique(); + node.query->promises.push_back(std::move(promise)); + node.query->generation = ++query_generation; + node.file_source_ids.reset_position(); + VLOG(file_references) << "new query " << query_generation; + } + run_node(node_id); +} } // namespace td diff --git a/td/telegram/FileReferenceManager.h b/td/telegram/FileReferenceManager.h index af4da2dc9..e6dcb22c9 100644 --- a/td/telegram/FileReferenceManager.h +++ b/td/telegram/FileReferenceManager.h @@ -22,14 +22,90 @@ namespace td { extern int VERBOSITY_NAME(file_references); +template +class SetWithPosition { + public: + void add(T value) { + auto it = std::find(values_.begin(), values_.end(), value); + if (it != end(values_)) { + return; + } + values_.push_back(value); + } + void remove(T value) { + auto it = std::find(values_.begin(), values_.end(), value); + if (it == end(values_)) { + return; + } + size_t i = it - values_.begin(); + values_.erase(it); + if (pos_ > i) { + pos_--; + } + } + void reset_position() { + pos_ = 0; + } + T next() { + return values_[pos_++]; + } + bool has_next() { + return pos_ < values_.size(); + } + void merge(SetWithPosition &&other) { + std::vector new_values_; + for (size_t i = 0; i < pos_; i++) { + new_values_.push_back(values_[i]); + } + for (size_t i = 0; i < other.pos_; i++) { + new_values_.push_back(other.values_[i]); + } + for (size_t i = pos_; i < values_.size(); i++) { + new_values_.push_back(values_[i]); + } + for (size_t i = other.pos_; i < other.values_.size(); i++) { + new_values_.push_back(other.values_[i]); + } + pos_ += other.values_.size(); + } + + private: + std::vector values_; + size_t pos_{0}; +}; class FileReferenceManager : public Actor { + struct Node; + public: FileSourceId create_message_file_source(FullMessageId full_message_id); - void update_file_reference(FileId file_id, vector file_source_ids, Promise<> promise); + using NodeId = FileId; + void update_file_reference(NodeId node_id, Promise<> promise); + void add_file_source(NodeId node_id, FileSourceId file_source_id); + void remove_file_source(NodeId file_id, FileSourceId file_source_id); + void merge(NodeId to_node_id, NodeId from_node_id); private: + struct Destination { + bool empty() const { + return node_id.empty(); + } + NodeId node_id; + int64 generation; + }; + struct Query { + std::vector> promises; + int32 active_queries{0}; + Destination proxy; + int64 generation; + }; + + struct Node { + SetWithPosition file_source_ids; + unique_ptr query; + }; + struct FileSourceMessage { FullMessageId full_message_id; }; @@ -60,6 +136,13 @@ class FileReferenceManager : public Actor { std::unordered_map full_message_id_to_file_source_id_; int32 last_file_source_id_{0}; + int64 query_generation{0}; + + std::unordered_map nodes_; + + void run_node(NodeId node); + void send_query(Destination dest, FileSourceId file_source_id); + Destination on_query_result(Destination dest, FileSourceId file_source_id, Status status, int32 sub = 0); }; } // namespace td diff --git a/td/telegram/files/FileManager.cpp b/td/telegram/files/FileManager.cpp index 04948e311..cf0d92a52 100644 --- a/td/telegram/files/FileManager.cpp +++ b/td/telegram/files/FileManager.cpp @@ -162,8 +162,8 @@ void FileNode::set_remote_location(const RemoteFileLocation &remote, FileLocatio void FileNode::delete_file_reference(Slice file_reference) { if (remote_.type() == RemoteFileLocation::Type::Full && remote_.full().delete_file_reference(file_reference)) { VLOG(file_references) << "Delete file reference of file " << main_file_id_; - upload_may_update_file_reference_ = true; - download_may_update_file_reference_ = true; + upload_was_update_file_reference_ = false; + download_was_update_file_reference_ = false; on_pmc_changed(); } } @@ -256,27 +256,6 @@ void FileNode::set_generate_priority(int8 download_priority, int8 upload_priorit generate_upload_priority_ = upload_priority; } -void FileNode::add_file_source(FileSourceId file_source_id) { - if (std::find(file_source_ids_.begin(), file_source_ids_.end(), file_source_id) != file_source_ids_.end()) { - return; - } - - VLOG(file_references) << "Add " << file_source_id << " to file " << main_file_id_; - upload_may_update_file_reference_ = true; - download_may_update_file_reference_ = true; - file_source_ids_.push_back(file_source_id); -} - -void FileNode::remove_file_source(FileSourceId file_source_id) { - auto it = std::find(file_source_ids_.begin(), file_source_ids_.end(), file_source_id); - if (it == file_source_ids_.end()) { - return; - } - - VLOG(file_references) << "Remove " << file_source_id << " from file " << main_file_id_; - file_source_ids_.erase(it); -} - void FileNode::on_changed() { on_pmc_changed(); on_info_changed(); @@ -1098,7 +1077,7 @@ void FileManager::cancel_download(FileNodePtr node) { send_closure(file_load_manager_, &FileLoadManager::cancel, node->download_id_); node->download_id_ = 0; node->is_download_started_ = false; - node->download_may_update_file_reference_ = !node->file_source_ids_.empty(); + node->download_was_update_file_reference_ = false; node->set_download_priority(0); } @@ -1108,7 +1087,7 @@ void FileManager::cancel_upload(FileNodePtr node) { } send_closure(file_load_manager_, &FileLoadManager::cancel, node->upload_id_); node->upload_id_ = 0; - node->upload_may_update_file_reference_ = !node->file_source_ids_.empty(); + node->upload_was_update_file_reference_ = false; node->set_upload_priority(0); } @@ -1223,9 +1202,11 @@ Result FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy node->set_local_location(other_node->local_, other_node->local_ready_size_, other_node->download_offset_, other_node->local_ready_prefix_size_); node->download_id_ = other_node->download_id_; + node->download_was_update_file_reference_ = other_node->download_was_update_file_reference_; node->is_download_started_ |= other_node->is_download_started_; node->set_download_priority(other_node->download_priority_); other_node->download_id_ = 0; + other_node->download_was_update_file_reference_ = false; other_node->is_download_started_ = false; other_node->download_priority_ = 0; other_node->download_offset_ = 0; @@ -1249,9 +1230,11 @@ Result FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy cancel_upload(node); node->set_remote_location(other_node->remote_, other_node->remote_source_, other_node->remote_ready_size_); node->upload_id_ = other_node->upload_id_; + node->upload_was_update_file_reference_ = other_node->upload_was_update_file_reference_; node->set_upload_priority(other_node->upload_priority_); node->set_upload_pause(other_node->upload_pause_); other_node->upload_id_ = 0; + other_node->upload_was_update_file_reference_ = false; other_node->upload_priority_ = 0; other_node->set_upload_pause(FileId()); } else { @@ -1297,13 +1280,15 @@ Result FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy } node->need_load_from_pmc_ |= other_node->need_load_from_pmc_; node->can_search_locally_ &= other_node->can_search_locally_; - for (auto source_id : other_node->file_source_ids_) { - node->add_file_source(source_id); - } if (main_file_id_i == other_node_i) { + send_closure(G()->file_reference_manager(), &FileReferenceManager::merge, other_node->main_file_id_, + node->main_file_id_); node->main_file_id_ = other_node->main_file_id_; node->main_file_id_priority_ = other_node->main_file_id_priority_; + } else { + send_closure(G()->file_reference_manager(), &FileReferenceManager::merge, node->main_file_id_, + other_node->main_file_id_); } bool send_updates_flag = false; @@ -1365,7 +1350,9 @@ void FileManager::add_file_source(FileId file_id, FileSourceId file_source_id) { if (!node) { return; } - node->add_file_source(file_source_id); + + send_closure(G()->file_reference_manager(), &FileReferenceManager::add_file_source, node->main_file_id_, + file_source_id); } void FileManager::remove_file_source(FileId file_id, FileSourceId file_source_id) { @@ -1374,7 +1361,8 @@ void FileManager::remove_file_source(FileId file_id, FileSourceId file_source_id if (!node) { return; } - node->remove_file_source(file_source_id); + send_closure(G()->file_reference_manager(), &FileReferenceManager::remove_file_source, node->main_file_id_, + file_source_id); } void FileManager::try_flush_node_full(FileNodePtr node, bool new_remote, bool new_local, bool new_generate, @@ -1761,25 +1749,20 @@ void FileManager::run_download(FileNodePtr node) { CHECK(node->download_id_ == 0); CHECK(!node->file_ids_.empty()); - auto file_id = node->file_ids_.back(); + auto file_id = node->main_file_id_; // If file reference is needed if (!file_view.has_active_remote_location()) { VLOG(file_references) << "run_download: Do not have valid file_reference for file " << file_id; QueryId id = queries_container_.create(Query{file_id, Query::DownloadWaitFileReferece}); node->download_id_ = id; - if (node->file_source_ids_.empty()) { - on_error(id, Status::Error("Can't download file: have no valid file reference and no valid source id")); - return; - } - if (!node->download_may_update_file_reference_) { + if (node->download_was_update_file_reference_) { on_error(id, Status::Error("Can't download file: have valid source id, but do not allowed to use it")); return; } - node->download_may_update_file_reference_ = false; + node->download_was_update_file_reference_ = true; send_closure(G()->file_reference_manager(), &FileReferenceManager::update_file_reference, file_id, - node->file_source_ids_, PromiseCreator::lambda([id, actor_id = actor_id(this), file_id](Result res) { Status error; if (res.is_ok()) { @@ -2056,14 +2039,15 @@ void FileManager::run_upload(FileNodePtr node, std::vector bad_parts) { CHECK(node->upload_id_ == 0); if (file_view.has_remote_location() && !file_view.has_active_remote_location() && file_view.get_type() != FileType::Thumbnail && file_view.get_type() != FileType::EncryptedThumbnail && - !node->file_source_ids_.empty() && node->upload_may_update_file_reference_) { + !node->upload_was_update_file_reference_) { QueryId id = queries_container_.create(Query{file_id, Query::UploadWaitFileReference}); node->upload_id_ = id; - node->upload_may_update_file_reference_ = false; + node->upload_was_update_file_reference_ = true; send_closure(G()->file_reference_manager(), &FileReferenceManager::update_file_reference, file_id, - node->file_source_ids_, PromiseCreator::lambda([id, actor_id = actor_id(this)](Result res) { - send_closure(actor_id, &FileManager::on_error, id, Status::Error("FILE_UPLOAD_RESTART")); + PromiseCreator::lambda([id, actor_id = actor_id(this)](Result res) { + send_closure(actor_id, &FileManager::on_error, id, + Status::Error("FILE_UPLOAD_RESTART_WITH_FILE_REFERENCE")); })); return; } @@ -2803,11 +2787,16 @@ void FileManager::on_error_impl(FileNodePtr node, FileManager::Query::Type type, } if (status.message() == "FILE_UPLOAD_RESTART") { + if (ends_with(status.message(), "WITH_FILE_REFERENCE")) { + node->upload_was_update_file_reference_ = true; + } run_upload(node, {}); return; } if (begins_with(status.message(), "FILE_DOWNLOAD_RESTART")) { - if (!ends_with(status.message(), "WITH_FILE_REFERENCE")) { + if (ends_with(status.message(), "WITH_FILE_REFERENCE")) { + node->download_was_update_file_reference_ = true; + } else { node->can_search_locally_ = false; } run_download(node); @@ -2863,12 +2852,14 @@ std::pair FileManager::finish_query(QueryId query_id) } if (node->download_id_ == query_id) { node->download_id_ = 0; + node->download_was_update_file_reference_ = false; node->is_download_started_ = false; node->set_download_priority(0); was_active = true; } if (node->upload_id_ == query_id) { node->upload_id_ = 0; + node->upload_was_update_file_reference_ = false; node->set_upload_priority(0); was_active = true; } diff --git a/td/telegram/files/FileManager.h b/td/telegram/files/FileManager.h index 9bd469865..477c4b2a3 100644 --- a/td/telegram/files/FileManager.h +++ b/td/telegram/files/FileManager.h @@ -81,9 +81,6 @@ class FileNode { void set_download_offset(int64 download_offset); - void add_file_source(FileSourceId file_source_id); - void remove_file_source(FileSourceId file_source_id); - void on_changed(); void on_info_changed(); void on_pmc_changed(); @@ -110,8 +107,6 @@ class FileNode { FileLoadManager::QueryId download_id_ = 0; int64 remote_ready_size_ = 0; - std::vector file_source_ids_; - unique_ptr generate_; FileLoadManager::QueryId generate_id_ = 0; @@ -151,8 +146,8 @@ class FileNode { bool pmc_changed_flag_{false}; bool info_changed_flag_{false}; - bool upload_may_update_file_reference_{false}; - bool download_may_update_file_reference_{false}; + bool upload_was_update_file_reference_{false}; + bool download_was_update_file_reference_{false}; void init_ready_size();