Add separate FileUploadManager.

This commit is contained in:
levlam 2024-07-12 17:27:26 +03:00
parent 82807ea456
commit 30e4e8cecb
7 changed files with 423 additions and 209 deletions

View File

@ -415,6 +415,7 @@ set(TDLIB_SOURCE_PART1
td/telegram/files/FileStatsWorker.cpp td/telegram/files/FileStatsWorker.cpp
td/telegram/files/FileType.cpp td/telegram/files/FileType.cpp
td/telegram/files/FileUploader.cpp td/telegram/files/FileUploader.cpp
td/telegram/files/FileUploadManager.cpp
td/telegram/files/PartsManager.cpp td/telegram/files/PartsManager.cpp
td/telegram/files/ResourceManager.cpp td/telegram/files/ResourceManager.cpp
td/telegram/ForumTopic.cpp td/telegram/ForumTopic.cpp
@ -714,6 +715,7 @@ set(TDLIB_SOURCE_PART2
td/telegram/files/FileStatsWorker.h td/telegram/files/FileStatsWorker.h
td/telegram/files/FileType.h td/telegram/files/FileType.h
td/telegram/files/FileUploader.h td/telegram/files/FileUploader.h
td/telegram/files/FileUploadManager.h
td/telegram/files/PartsManager.h td/telegram/files/PartsManager.h
td/telegram/files/ResourceManager.h td/telegram/files/ResourceManager.h
td/telegram/files/ResourceState.h td/telegram/files/ResourceState.h

View File

@ -24,10 +24,6 @@ FileLoadManager::FileLoadManager(unique_ptr<Callback> callback, ActorShared<> pa
} }
void FileLoadManager::start_up() { void FileLoadManager::start_up() {
constexpr int64 MAX_UPLOAD_RESOURCE_LIMIT = 4 << 20;
upload_resource_manager_ = create_actor<ResourceManager>(
"UploadResourceManager", MAX_UPLOAD_RESOURCE_LIMIT,
!G()->keep_media_order() ? ResourceManager::Mode::Greedy : ResourceManager::Mode::Baseline);
if (G()->get_option_boolean("is_premium")) { if (G()->get_option_boolean("is_premium")) {
max_download_resource_limit_ *= 8; max_download_resource_limit_ *= 8;
} }
@ -67,42 +63,6 @@ void FileLoadManager::download(QueryId query_id, const FullRemoteFileLocation &r
CHECK(is_inserted); CHECK(is_inserted);
} }
void FileLoadManager::upload(QueryId query_id, const LocalFileLocation &local_location,
const RemoteFileLocation &remote_location, int64 expected_size,
const FileEncryptionKey &encryption_key, int8 priority, vector<int> bad_parts) {
if (stop_flag_) {
return;
}
NodeId node_id = nodes_container_.create(Node());
Node *node = nodes_container_.get(node_id);
CHECK(node);
node->query_id_ = query_id;
auto callback = make_unique<FileUploaderCallback>(actor_shared(this, node_id));
node->loader_ = create_actor<FileUploader>("Uploader", local_location, remote_location, expected_size, encryption_key,
std::move(bad_parts), std::move(callback));
send_closure(upload_resource_manager_, &ResourceManager::register_worker,
ActorShared<FileLoaderActor>(node->loader_.get(), static_cast<uint64>(-1)), priority);
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
CHECK(is_inserted);
}
void FileLoadManager::upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size,
int8 priority) {
if (stop_flag_) {
return;
}
NodeId node_id = nodes_container_.create(Node());
Node *node = nodes_container_.get(node_id);
CHECK(node);
node->query_id_ = query_id;
auto callback = make_unique<FileHashUploaderCallback>(actor_shared(this, node_id));
node->loader_ = create_actor<FileHashUploader>("HashUploader", local_location, size, std::move(callback));
send_closure(upload_resource_manager_, &ResourceManager::register_worker,
ActorShared<FileLoaderActor>(node->loader_.get(), static_cast<uint64>(-1)), priority);
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
CHECK(is_inserted);
}
void FileLoadManager::update_priority(QueryId query_id, int8 priority) { void FileLoadManager::update_priority(QueryId query_id, int8 priority) {
if (stop_flag_) { if (stop_flag_) {
return; return;
@ -171,21 +131,6 @@ void FileLoadManager::cancel(QueryId query_id) {
on_error_impl(it->second, Status::Error(-1, "Canceled")); on_error_impl(it->second, Status::Error(-1, "Canceled"));
} }
void FileLoadManager::update_local_file_location(QueryId query_id, const LocalFileLocation &local) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_local_file_location, local);
}
void FileLoadManager::update_downloaded_part(QueryId query_id, int64 offset, int64 limit) { void FileLoadManager::update_downloaded_part(QueryId query_id, int64 offset, int64 limit) {
if (stop_flag_) { if (stop_flag_) {
return; return;
@ -229,28 +174,6 @@ void FileLoadManager::on_partial_download(PartialLocalFileLocation partial_local
} }
} }
void FileLoadManager::on_hash(string hash) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_hash(node->query_id_, std::move(hash));
}
}
void FileLoadManager::on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_partial_upload(node->query_id_, std::move(partial_remote), ready_size);
}
}
void FileLoadManager::on_ok_download(FullLocalFileLocation local, int64 size, bool is_new) { void FileLoadManager::on_ok_download(FullLocalFileLocation local, int64 size, bool is_new) {
auto node_id = get_link_token(); auto node_id = get_link_token();
auto node = nodes_container_.get(node_id); auto node = nodes_container_.get(node_id);
@ -264,32 +187,6 @@ void FileLoadManager::on_ok_download(FullLocalFileLocation local, int64 size, bo
loop(); loop();
} }
void FileLoadManager::on_ok_upload(FileType file_type, PartialRemoteFileLocation remote, int64 size) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_upload_ok(node->query_id_, file_type, std::move(remote), size);
}
close_node(node_id);
loop();
}
void FileLoadManager::on_ok_upload_full(FullRemoteFileLocation remote) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_upload_full_ok(node->query_id_, std::move(remote));
}
close_node(node_id);
loop();
}
void FileLoadManager::on_error(Status status) { void FileLoadManager::on_error(Status status) {
auto node_id = get_link_token(); auto node_id = get_link_token();
on_error_impl(node_id, std::move(status)); on_error_impl(node_id, std::move(status));

View File

@ -9,11 +9,9 @@
#include "td/telegram/files/FileDownloader.h" #include "td/telegram/files/FileDownloader.h"
#include "td/telegram/files/FileEncryptionKey.h" #include "td/telegram/files/FileEncryptionKey.h"
#include "td/telegram/files/FileFromBytes.h" #include "td/telegram/files/FileFromBytes.h"
#include "td/telegram/files/FileHashUploader.h"
#include "td/telegram/files/FileLoaderUtils.h" #include "td/telegram/files/FileLoaderUtils.h"
#include "td/telegram/files/FileLocation.h" #include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/FileType.h" #include "td/telegram/files/FileType.h"
#include "td/telegram/files/FileUploader.h"
#include "td/telegram/files/ResourceManager.h" #include "td/telegram/files/ResourceManager.h"
#include "td/telegram/net/DcId.h" #include "td/telegram/net/DcId.h"
@ -38,10 +36,6 @@ class FileLoadManager final : public Actor {
virtual void on_start_download(QueryId query_id) = 0; virtual void on_start_download(QueryId query_id) = 0;
virtual void on_partial_download(QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size, virtual void on_partial_download(QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size,
int64 size) = 0; int64 size) = 0;
virtual void on_partial_upload(QueryId query_id, PartialRemoteFileLocation partial_remote, int64 ready_size) = 0;
virtual void on_hash(QueryId query_id, string hash) = 0;
virtual void on_upload_ok(QueryId query_id, FileType file_type, PartialRemoteFileLocation remote, int64 size) = 0;
virtual void on_upload_full_ok(QueryId query_id, FullRemoteFileLocation remote) = 0;
virtual void on_download_ok(QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) = 0; virtual void on_download_ok(QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) = 0;
virtual void on_error(QueryId query_id, Status status) = 0; virtual void on_error(QueryId query_id, Status status) = 0;
}; };
@ -51,13 +45,13 @@ class FileLoadManager final : public Actor {
void download(QueryId query_id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local, void download(QueryId query_id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local,
int64 size, string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int64 size, string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset,
int64 limit, int8 priority); int64 limit, int8 priority);
void upload(QueryId query_id, const LocalFileLocation &local_location, const RemoteFileLocation &remote_location,
int64 expected_size, const FileEncryptionKey &encryption_key, int8 priority, vector<int> bad_parts);
void upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size, int8 priority);
void update_priority(QueryId query_id, int8 priority); void update_priority(QueryId query_id, int8 priority);
void from_bytes(QueryId query_id, FileType type, BufferSlice bytes, string name); void from_bytes(QueryId query_id, FileType type, BufferSlice bytes, string name);
void cancel(QueryId query_id); void cancel(QueryId query_id);
void update_local_file_location(QueryId query_id, const LocalFileLocation &local);
void update_downloaded_part(QueryId query_id, int64 offset, int64 limit); void update_downloaded_part(QueryId query_id, int64 offset, int64 limit);
void get_content(string file_path, Promise<BufferSlice> promise); void get_content(string file_path, Promise<BufferSlice> promise);
@ -81,7 +75,6 @@ class FileLoadManager final : public Actor {
std::map<DcId, ActorOwn<ResourceManager>> download_resource_manager_map_; std::map<DcId, ActorOwn<ResourceManager>> download_resource_manager_map_;
std::map<DcId, ActorOwn<ResourceManager>> download_small_resource_manager_map_; std::map<DcId, ActorOwn<ResourceManager>> download_small_resource_manager_map_;
ActorOwn<ResourceManager> upload_resource_manager_;
Container<Node> nodes_container_; Container<Node> nodes_container_;
unique_ptr<Callback> callback_; unique_ptr<Callback> callback_;
@ -100,11 +93,7 @@ class FileLoadManager final : public Actor {
void on_start_download(); void on_start_download();
void on_partial_download(PartialLocalFileLocation partial_local, int64 ready_size, int64 size); void on_partial_download(PartialLocalFileLocation partial_local, int64 ready_size, int64 size);
void on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size);
void on_hash(string hash);
void on_ok_download(FullLocalFileLocation local, int64 size, bool is_new); void on_ok_download(FullLocalFileLocation local, int64 size, bool is_new);
void on_ok_upload(FileType file_type, PartialRemoteFileLocation remote, int64 size);
void on_ok_upload_full(FullRemoteFileLocation remote);
void on_error(Status status); void on_error(Status status);
void on_error_impl(NodeId node_id, Status status); void on_error_impl(NodeId node_id, Status status);
@ -130,43 +119,6 @@ class FileLoadManager final : public Actor {
} }
}; };
class FileUploaderCallback final : public FileUploader::Callback {
public:
explicit FileUploaderCallback(ActorShared<FileLoadManager> actor_id) : actor_id_(std::move(actor_id)) {
}
private:
ActorShared<FileLoadManager> actor_id_;
void on_hash(string hash) final {
send_closure(actor_id_, &FileLoadManager::on_hash, std::move(hash));
}
void on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size) final {
send_closure(actor_id_, &FileLoadManager::on_partial_upload, std::move(partial_remote), ready_size);
}
void on_ok(FileType file_type, PartialRemoteFileLocation partial_remote, int64 size) final {
send_closure(std::move(actor_id_), &FileLoadManager::on_ok_upload, file_type, std::move(partial_remote), size);
}
void on_error(Status status) final {
send_closure(std::move(actor_id_), &FileLoadManager::on_error, std::move(status));
}
};
class FileHashUploaderCallback final : public FileHashUploader::Callback {
public:
explicit FileHashUploaderCallback(ActorShared<FileLoadManager> actor_id) : actor_id_(std::move(actor_id)) {
}
private:
ActorShared<FileLoadManager> actor_id_;
void on_ok(FullRemoteFileLocation remote) final {
send_closure(std::move(actor_id_), &FileLoadManager::on_ok_upload_full, std::move(remote));
}
void on_error(Status status) final {
send_closure(std::move(actor_id_), &FileLoadManager::on_error, std::move(status));
}
};
class FileFromBytesCallback final : public FileFromBytes::Callback { class FileFromBytesCallback final : public FileFromBytes::Callback {
public: public:
explicit FileFromBytesCallback(ActorShared<FileLoadManager> actor_id) : actor_id_(std::move(actor_id)) { explicit FileFromBytesCallback(ActorShared<FileLoadManager> actor_id) : actor_id_(std::move(actor_id)) {

View File

@ -889,6 +889,9 @@ void FileManager::init_actor() {
file_load_manager_ = create_actor_on_scheduler<FileLoadManager>("FileLoadManager", G()->get_slow_net_scheduler_id(), file_load_manager_ = create_actor_on_scheduler<FileLoadManager>("FileLoadManager", G()->get_slow_net_scheduler_id(),
make_unique<FileLoadManagerCallback>(actor_id(this)), make_unique<FileLoadManagerCallback>(actor_id(this)),
context_->create_reference()); context_->create_reference());
file_upload_manager_ = create_actor_on_scheduler<FileUploadManager>(
"FileUploadManager", G()->get_slow_net_scheduler_id(), make_unique<FileUploadManagerCallback>(actor_id(this)),
context_->create_reference());
file_generate_manager_ = create_actor_on_scheduler<FileGenerateManager>( file_generate_manager_ = create_actor_on_scheduler<FileGenerateManager>(
"FileGenerateManager", G()->get_slow_net_scheduler_id(), context_->create_reference()); "FileGenerateManager", G()->get_slow_net_scheduler_id(), context_->create_reference());
} }
@ -1608,7 +1611,7 @@ void FileManager::do_cancel_upload(FileNodePtr node) {
if (node->upload_id_ == 0) { if (node->upload_id_ == 0) {
return; return;
} }
send_closure(file_load_manager_, &FileLoadManager::cancel, node->upload_id_); send_closure(file_upload_manager_, &FileUploadManager::cancel, node->upload_id_);
node->upload_id_ = 0; node->upload_id_ = 0;
node->upload_was_update_file_reference_ = false; node->upload_was_update_file_reference_ = false;
node->set_upload_priority(0); node->set_upload_priority(0);
@ -2564,7 +2567,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
} }
VLOG(file_references) VLOG(file_references)
<< "Receive result from reload photo for file " << file_id << ": " << error; << "Receive result from reload photo for file " << file_id << ": " << error;
send_closure(actor_id, &FileManager::on_error, query_id, std::move(error)); send_closure(actor_id, &FileManager::on_download_error, query_id, std::move(error));
})); }));
node->need_reload_photo_ = false; node->need_reload_photo_ = false;
return; return;
@ -2577,7 +2580,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
queries_container_.create(Query{file_id, Query::Type::DownloadWaitFileReference}); queries_container_.create(Query{file_id, Query::Type::DownloadWaitFileReference});
node->download_id_ = query_id; node->download_id_ = query_id;
if (node->download_was_update_file_reference_) { if (node->download_was_update_file_reference_) {
return on_error(query_id, Status::Error("Can't download file: have no valid file reference")); return on_download_error(query_id, Status::Error("Can't download file: have no valid file reference"));
} }
node->download_was_update_file_reference_ = true; node->download_was_update_file_reference_ = true;
@ -2590,7 +2593,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
error = res.move_as_error(); error = res.move_as_error();
} }
VLOG(file_references) << "Receive result from FileSourceManager for file " << file_id << ": " << error; VLOG(file_references) << "Receive result from FileSourceManager for file " << file_id << ": " << error;
send_closure(actor_id, &FileManager::on_error, query_id, std::move(error)); send_closure(actor_id, &FileManager::on_download_error, query_id, std::move(error));
})); }));
return; return;
} }
@ -3008,7 +3011,7 @@ void FileManager::run_generate(FileNodePtr node) {
send_closure(actor_, &FileManager::on_generate_ok, query_id_, std::move(local)); send_closure(actor_, &FileManager::on_generate_ok, query_id_, std::move(local));
} }
void on_error(Status error) final { void on_error(Status error) final {
send_closure(actor_, &FileManager::on_error, query_id_, std::move(error)); send_closure(actor_, &FileManager::on_download_error, query_id_, std::move(error));
} }
}; };
return make_unique<Callback>(file_manager->actor_id(file_manager), query_id); return make_unique<Callback>(file_manager->actor_id(file_manager), query_id);
@ -3084,23 +3087,25 @@ void FileManager::run_upload(FileNodePtr node, vector<int> bad_parts) {
if (old_priority != 0) { if (old_priority != 0) {
LOG(INFO) << "File " << file_id << " is already uploading"; LOG(INFO) << "File " << file_id << " is already uploading";
CHECK(node->upload_id_ != 0); CHECK(node->upload_id_ != 0);
send_closure(file_load_manager_, &FileLoadManager::update_priority, node->upload_id_, narrow_cast<int8>(-priority)); send_closure(file_upload_manager_, &FileUploadManager::update_priority, node->upload_id_,
narrow_cast<int8>(-priority));
return; return;
} }
CHECK(node->upload_id_ == 0); CHECK(node->upload_id_ == 0);
if (file_view.has_alive_remote_location() && !file_view.has_active_upload_remote_location() && if (file_view.has_alive_remote_location() && !file_view.has_active_upload_remote_location() &&
can_reuse_remote_file(file_view.get_type())) { can_reuse_remote_file(file_view.get_type())) {
FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::UploadWaitFileReference}); FileUploadManager::QueryId query_id =
queries_container_.create(Query{file_id, Query::Type::UploadWaitFileReference});
node->upload_id_ = query_id; node->upload_id_ = query_id;
if (node->upload_was_update_file_reference_) { if (node->upload_was_update_file_reference_) {
return on_error(query_id, Status::Error("Can't upload file: have no valid file reference")); return on_upload_error(query_id, Status::Error("Can't upload file: have no valid file reference"));
} }
node->upload_was_update_file_reference_ = true; node->upload_was_update_file_reference_ = true;
context_->repair_file_reference(node->main_file_id_, context_->repair_file_reference(node->main_file_id_,
PromiseCreator::lambda([actor_id = actor_id(this), query_id](Result<Unit> res) { PromiseCreator::lambda([actor_id = actor_id(this), query_id](Result<Unit> res) {
send_closure(actor_id, &FileManager::on_error, query_id, send_closure(actor_id, &FileManager::on_upload_error, query_id,
Status::Error("FILE_UPLOAD_RESTART_WITH_FILE_REFERENCE")); Status::Error("FILE_UPLOAD_RESTART_WITH_FILE_REFERENCE"));
})); }));
return; return;
@ -3108,10 +3113,10 @@ void FileManager::run_upload(FileNodePtr node, vector<int> bad_parts) {
if (!node->remote_.partial && node->get_by_hash_) { if (!node->remote_.partial && node->get_by_hash_) {
LOG(INFO) << "Get file " << node->main_file_id_ << " by hash"; LOG(INFO) << "Get file " << node->main_file_id_ << " by hash";
FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::UploadByHash}); FileUploadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::UploadByHash});
node->upload_id_ = query_id; node->upload_id_ = query_id;
send_closure(file_load_manager_, &FileLoadManager::upload_by_hash, query_id, node->local_.full(), node->size_, send_closure(file_upload_manager_, &FileUploadManager::upload_by_hash, query_id, node->local_.full(), node->size_,
narrow_cast<int8>(-priority)); narrow_cast<int8>(-priority));
return; return;
} }
@ -3124,12 +3129,13 @@ void FileManager::run_upload(FileNodePtr node, vector<int> bad_parts) {
expected_size = 10 << 20; expected_size = 10 << 20;
} }
FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Upload}); FileUploadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Upload});
node->upload_id_ = query_id; node->upload_id_ = query_id;
send_closure(file_load_manager_, &FileLoadManager::upload, query_id, node->local_, node->remote_.partial_or_empty(), send_closure(file_upload_manager_, &FileUploadManager::upload, query_id, node->local_,
expected_size, node->encryption_key_, new_priority, std::move(bad_parts)); node->remote_.partial_or_empty(), expected_size, node->encryption_key_, new_priority,
std::move(bad_parts));
LOG(INFO) << "File " << file_id << " upload request has sent to FileLoadManager"; LOG(INFO) << "File " << file_id << " upload request has sent to FileUploadManager";
} }
void FileManager::upload(FileId file_id, std::shared_ptr<UploadCallback> callback, int32 new_priority, void FileManager::upload(FileId file_id, std::shared_ptr<UploadCallback> callback, int32 new_priority,
@ -3812,7 +3818,7 @@ void FileManager::on_hash(FileLoadManager::QueryId query_id, string hash) {
file_node->encryption_key_.set_value_hash(secure_storage::ValueHash::create(hash).move_as_ok()); file_node->encryption_key_.set_value_hash(secure_storage::ValueHash::create(hash).move_as_ok());
} }
void FileManager::on_partial_upload(FileLoadManager::QueryId query_id, PartialRemoteFileLocation partial_remote, void FileManager::on_partial_upload(FileUploadManager::QueryId query_id, PartialRemoteFileLocation partial_remote,
int64 ready_size) { int64 ready_size) {
if (is_closed_) { if (is_closed_) {
return; return;
@ -3847,7 +3853,7 @@ void FileManager::on_download_ok(FileLoadManager::QueryId query_id, FullLocalFil
Query query; Query query;
bool was_active; bool was_active;
std::tie(query, was_active) = finish_query(query_id); std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
auto file_id = query.file_id_; auto file_id = query.file_id_;
LOG(INFO) << "ON DOWNLOAD OK of " << (is_new ? "new" : "checked") << " file " << file_id << " of size " << size; LOG(INFO) << "ON DOWNLOAD OK of " << (is_new ? "new" : "checked") << " file " << file_id << " of size " << size;
auto r_new_file_id = register_local(std::move(local), DialogId(), size, false, false, true, file_id); auto r_new_file_id = register_local(std::move(local), DialogId(), size, false, false, true, file_id);
@ -3865,14 +3871,14 @@ void FileManager::on_download_ok(FileLoadManager::QueryId query_id, FullLocalFil
} }
} }
void FileManager::on_upload_ok(FileLoadManager::QueryId query_id, FileType file_type, void FileManager::on_upload_ok(FileUploadManager::QueryId query_id, FileType file_type,
PartialRemoteFileLocation partial_remote, int64 size) { PartialRemoteFileLocation partial_remote, int64 size) {
if (is_closed_) { if (is_closed_) {
return; return;
} }
CHECK(partial_remote.ready_part_count_ == partial_remote.part_count_); CHECK(partial_remote.ready_part_count_ == partial_remote.part_count_);
auto some_file_id = finish_query(query_id).first.file_id_; auto some_file_id = finish_query(static_cast<Container<Query>::Id>(query_id)).first.file_id_;
LOG(INFO) << "ON UPLOAD OK file " << some_file_id << " of size " << size; LOG(INFO) << "ON UPLOAD OK file " << some_file_id << " of size " << size;
auto file_node = get_file_node(some_file_id); auto file_node = get_file_node(some_file_id);
@ -3944,12 +3950,12 @@ void FileManager::on_upload_ok(FileLoadManager::QueryId query_id, FileType file_
} }
// for upload by hash // for upload by hash
void FileManager::on_upload_full_ok(FileLoadManager::QueryId query_id, FullRemoteFileLocation remote) { void FileManager::on_upload_full_ok(FileUploadManager::QueryId query_id, FullRemoteFileLocation remote) {
if (is_closed_) { if (is_closed_) {
return; return;
} }
auto file_id = finish_query(query_id).first.file_id_; auto file_id = finish_query(static_cast<Container<Query>::Id>(query_id)).first.file_id_;
LOG(INFO) << "ON UPLOAD FULL OK for file " << file_id; LOG(INFO) << "ON UPLOAD FULL OK for file " << file_id;
auto new_file_id = register_remote(std::move(remote), FileLocationSource::FromServer, DialogId(), 0, 0, ""); auto new_file_id = register_remote(std::move(remote), FileLocationSource::FromServer, DialogId(), 0, 0, "");
LOG_STATUS(merge(new_file_id, file_id)); LOG_STATUS(merge(new_file_id, file_id));
@ -3985,7 +3991,7 @@ void FileManager::on_partial_generate(FileLoadManager::QueryId query_id, Partial
run_upload(file_node, {}); run_upload(file_node, {});
} }
if (file_node->upload_id_ != 0) { if (file_node->upload_id_ != 0) {
send_closure(file_load_manager_, &FileLoadManager::update_local_file_location, file_node->upload_id_, send_closure(file_upload_manager_, &FileUploadManager::update_local_file_location, file_node->upload_id_,
LocalFileLocation(std::move(partial_local))); LocalFileLocation(std::move(partial_local)));
} }
@ -3999,7 +4005,7 @@ void FileManager::on_generate_ok(FileLoadManager::QueryId query_id, FullLocalFil
Query query; Query query;
bool was_active; bool was_active;
std::tie(query, was_active) = finish_query(query_id); std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
auto generate_file_id = query.file_id_; auto generate_file_id = query.file_id_;
LOG(INFO) << "Receive on_generate_ok for file " << generate_file_id << ": " << local; LOG(INFO) << "Receive on_generate_ok for file " << generate_file_id << ": " << local;
@ -4030,20 +4036,36 @@ void FileManager::on_generate_ok(FileLoadManager::QueryId query_id, FullLocalFil
if (was_active) { if (was_active) {
if (old_upload_id != 0 && old_upload_id == file_node->upload_id_) { if (old_upload_id != 0 && old_upload_id == file_node->upload_id_) {
send_closure(file_load_manager_, &FileLoadManager::update_local_file_location, file_node->upload_id_, send_closure(file_upload_manager_, &FileUploadManager::update_local_file_location, file_node->upload_id_,
LocalFileLocation(std::move(local))); LocalFileLocation(std::move(local)));
} }
} }
} }
void FileManager::on_error(FileLoadManager::QueryId query_id, Status status) { void FileManager::on_download_error(FileLoadManager::QueryId query_id, Status status) {
if (is_closed_) { if (is_closed_) {
return; return;
} }
Query query; Query query;
bool was_active; bool was_active;
std::tie(query, was_active) = finish_query(query_id); std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
auto node = get_file_node(query.file_id_);
if (!node) {
LOG(ERROR) << "Can't find file node for " << query.file_id_ << " " << status;
return;
}
on_error_impl(node, query.type_, was_active, std::move(status));
}
void FileManager::on_upload_error(FileUploadManager::QueryId query_id, Status status) {
if (is_closed_) {
return;
}
Query query;
bool was_active;
std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
auto node = get_file_node(query.file_id_); auto node = get_file_node(query.file_id_);
if (!node) { if (!node) {
LOG(ERROR) << "Can't find file node for " << query.file_id_ << " " << status; LOG(ERROR) << "Can't find file node for " << query.file_id_ << " " << status;
@ -4204,7 +4226,7 @@ void FileManager::on_error_impl(FileNodePtr node, Query::Type type, bool was_act
} }
} }
std::pair<FileManager::Query, bool> FileManager::finish_query(FileLoadManager::QueryId query_id) { std::pair<FileManager::Query, bool> FileManager::finish_query(Container<Query>::Id query_id) {
SCOPE_EXIT { SCOPE_EXIT {
queries_container_.erase(query_id); queries_container_.erase(query_id);
}; };
@ -4262,10 +4284,17 @@ void FileManager::hangup() {
file_db_.reset(); file_db_.reset();
file_generate_manager_.reset(); file_generate_manager_.reset();
file_load_manager_.reset(); file_load_manager_.reset();
file_upload_manager_.reset();
while (!queries_container_.empty()) { while (!queries_container_.empty()) {
auto query_ids = queries_container_.ids(); auto query_ids = queries_container_.ids();
for (auto query_id : query_ids) { for (auto query_id : query_ids) {
on_error(query_id, Global::request_aborted_error()); Query query;
bool was_active;
std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
auto node = get_file_node(query.file_id_);
if (node) {
on_error_impl(node, query.type_, was_active, Global::request_aborted_error());
}
} }
} }
is_closed_ = true; is_closed_ = true;

View File

@ -16,6 +16,7 @@
#include "td/telegram/files/FileLocation.h" #include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/FileSourceId.h" #include "td/telegram/files/FileSourceId.h"
#include "td/telegram/files/FileType.h" #include "td/telegram/files/FileType.h"
#include "td/telegram/files/FileUploadManager.h"
#include "td/telegram/Location.h" #include "td/telegram/Location.h"
#include "td/telegram/PhotoSizeSource.h" #include "td/telegram/PhotoSizeSource.h"
#include "td/telegram/td_api.h" #include "td/telegram/td_api.h"
@ -134,7 +135,7 @@ class FileNode {
static constexpr char PERSISTENT_ID_VERSION = 4; static constexpr char PERSISTENT_ID_VERSION = 4;
LocalFileLocation local_; LocalFileLocation local_;
FileLoadManager::QueryId upload_id_ = 0; FileUploadManager::QueryId upload_id_ = 0;
int64 download_offset_ = 0; int64 download_offset_ = 0;
int64 private_download_limit_ = 0; int64 private_download_limit_ = 0;
int64 local_ready_size_ = 0; // PartialLocal only int64 local_ready_size_ = 0; // PartialLocal only
@ -576,30 +577,42 @@ class FileManager final : public Actor {
send_closure(actor_id_, &FileManager::on_partial_download, query_id, std::move(partial_local), ready_size, size); send_closure(actor_id_, &FileManager::on_partial_download, query_id, std::move(partial_local), ready_size, size);
} }
void on_partial_upload(FileLoadManager::QueryId query_id, PartialRemoteFileLocation partial_remote,
int64 ready_size) final {
send_closure(actor_id_, &FileManager::on_partial_upload, query_id, std::move(partial_remote), ready_size);
}
void on_hash(FileLoadManager::QueryId query_id, string hash) final {
send_closure(actor_id_, &FileManager::on_hash, query_id, std::move(hash));
}
void on_upload_ok(FileLoadManager::QueryId query_id, FileType file_type, PartialRemoteFileLocation remote,
int64 size) final {
send_closure(actor_id_, &FileManager::on_upload_ok, query_id, file_type, std::move(remote), size);
}
void on_upload_full_ok(FileLoadManager::QueryId query_id, FullRemoteFileLocation remote) final {
send_closure(actor_id_, &FileManager::on_upload_full_ok, query_id, std::move(remote));
}
void on_download_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) final { void on_download_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) final {
send_closure(actor_id_, &FileManager::on_download_ok, query_id, std::move(local), size, is_new); send_closure(actor_id_, &FileManager::on_download_ok, query_id, std::move(local), size, is_new);
} }
void on_error(FileLoadManager::QueryId query_id, Status status) final { void on_error(FileLoadManager::QueryId query_id, Status status) final {
send_closure(actor_id_, &FileManager::on_error, query_id, std::move(status)); send_closure(actor_id_, &FileManager::on_download_error, query_id, std::move(status));
}
};
class FileUploadManagerCallback final : public FileUploadManager::Callback {
public:
explicit FileUploadManagerCallback(ActorId<FileManager> actor_id) : actor_id_(std::move(actor_id)) {
}
private:
ActorId<FileManager> actor_id_;
void on_partial_upload(FileUploadManager::QueryId query_id, PartialRemoteFileLocation partial_remote,
int64 ready_size) final {
send_closure(actor_id_, &FileManager::on_partial_upload, query_id, std::move(partial_remote), ready_size);
}
void on_hash(FileUploadManager::QueryId query_id, string hash) final {
send_closure(actor_id_, &FileManager::on_hash, query_id, std::move(hash));
}
void on_upload_ok(FileUploadManager::QueryId query_id, FileType file_type, PartialRemoteFileLocation remote,
int64 size) final {
send_closure(actor_id_, &FileManager::on_upload_ok, query_id, file_type, std::move(remote), size);
}
void on_upload_full_ok(FileUploadManager::QueryId query_id, FullRemoteFileLocation remote) final {
send_closure(actor_id_, &FileManager::on_upload_full_ok, query_id, std::move(remote));
}
void on_error(FileUploadManager::QueryId query_id, Status status) final {
send_closure(actor_id_, &FileManager::on_upload_error, query_id, std::move(status));
} }
}; };
@ -680,6 +693,7 @@ class FileManager final : public Actor {
WaitFreeVector<int32> empty_file_ids_; WaitFreeVector<int32> empty_file_ids_;
WaitFreeVector<unique_ptr<FileNode>> file_nodes_; WaitFreeVector<unique_ptr<FileNode>> file_nodes_;
ActorOwn<FileLoadManager> file_load_manager_; ActorOwn<FileLoadManager> file_load_manager_;
ActorOwn<FileUploadManager> file_upload_manager_;
ActorOwn<FileGenerateManager> file_generate_manager_; ActorOwn<FileGenerateManager> file_generate_manager_;
Container<Query> queries_container_; Container<Query> queries_container_;
@ -756,20 +770,23 @@ class FileManager final : public Actor {
void on_start_download(FileLoadManager::QueryId query_id); void on_start_download(FileLoadManager::QueryId query_id);
void on_partial_download(FileLoadManager::QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size, void on_partial_download(FileLoadManager::QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size,
int64 size); int64 size);
void on_hash(FileLoadManager::QueryId query_id, string hash);
void on_partial_upload(FileLoadManager::QueryId query_id, PartialRemoteFileLocation partial_remote, int64 ready_size);
void on_download_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new); void on_download_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new);
void on_upload_ok(FileLoadManager::QueryId query_id, FileType file_type, PartialRemoteFileLocation partial_remote, void on_download_error(FileLoadManager::QueryId query_id, Status status);
void on_hash(FileUploadManager::QueryId query_id, string hash);
void on_partial_upload(FileUploadManager::QueryId query_id, PartialRemoteFileLocation partial_remote,
int64 ready_size);
void on_upload_ok(FileUploadManager::QueryId query_id, FileType file_type, PartialRemoteFileLocation partial_remote,
int64 size); int64 size);
void on_upload_full_ok(FileLoadManager::QueryId query_id, FullRemoteFileLocation remote); void on_upload_full_ok(FileUploadManager::QueryId query_id, FullRemoteFileLocation remote);
void on_error(FileLoadManager::QueryId query_id, Status status); void on_upload_error(FileUploadManager::QueryId query_id, Status status);
void on_error_impl(FileNodePtr node, Query::Type type, bool was_active, Status status); void on_error_impl(FileNodePtr node, Query::Type type, bool was_active, Status status);
void on_partial_generate(FileLoadManager::QueryId, PartialLocalFileLocation partial_local, int64 expected_size); void on_partial_generate(FileLoadManager::QueryId, PartialLocalFileLocation partial_local, int64 expected_size);
void on_generate_ok(FileLoadManager::QueryId, FullLocalFileLocation local); void on_generate_ok(FileLoadManager::QueryId, FullLocalFileLocation local);
std::pair<Query, bool> finish_query(FileLoadManager::QueryId query_id); std::pair<Query, bool> finish_query(Container<Query>::Id query_id);
FullRemoteFileLocation *get_remote(int32 key); FullRemoteFileLocation *get_remote(int32 key);

View File

@ -0,0 +1,196 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/files/FileUploadManager.h"
#include "td/telegram/Global.h"
#include "td/utils/common.h"
#include "td/utils/SliceBuilder.h"
namespace td {
FileUploadManager::Callback::~Callback() = default;
FileUploadManager::FileUploadManager(unique_ptr<Callback> callback, ActorShared<> parent)
: callback_(std::move(callback)), parent_(std::move(parent)) {
}
void FileUploadManager::start_up() {
constexpr int64 MAX_UPLOAD_RESOURCE_LIMIT = 4 << 20;
upload_resource_manager_ = create_actor<ResourceManager>(
"UploadResourceManager", MAX_UPLOAD_RESOURCE_LIMIT,
!G()->keep_media_order() ? ResourceManager::Mode::Greedy : ResourceManager::Mode::Baseline);
}
void FileUploadManager::upload(QueryId query_id, const LocalFileLocation &local_location,
const RemoteFileLocation &remote_location, int64 expected_size,
const FileEncryptionKey &encryption_key, int8 priority, vector<int> bad_parts) {
if (stop_flag_) {
return;
}
NodeId node_id = nodes_container_.create(Node());
Node *node = nodes_container_.get(node_id);
CHECK(node);
node->query_id_ = query_id;
auto callback = make_unique<FileUploaderCallback>(actor_shared(this, node_id));
node->loader_ = create_actor<FileUploader>("Uploader", local_location, remote_location, expected_size, encryption_key,
std::move(bad_parts), std::move(callback));
send_closure(upload_resource_manager_, &ResourceManager::register_worker,
ActorShared<FileLoaderActor>(node->loader_.get(), static_cast<uint64>(-1)), priority);
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
CHECK(is_inserted);
}
void FileUploadManager::upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size,
int8 priority) {
if (stop_flag_) {
return;
}
NodeId node_id = nodes_container_.create(Node());
Node *node = nodes_container_.get(node_id);
CHECK(node);
node->query_id_ = query_id;
auto callback = make_unique<FileHashUploaderCallback>(actor_shared(this, node_id));
node->loader_ = create_actor<FileHashUploader>("HashUploader", local_location, size, std::move(callback));
send_closure(upload_resource_manager_, &ResourceManager::register_worker,
ActorShared<FileLoaderActor>(node->loader_.get(), static_cast<uint64>(-1)), priority);
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
CHECK(is_inserted);
}
void FileUploadManager::update_priority(QueryId query_id, int8 priority) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_priority, priority);
}
void FileUploadManager::cancel(QueryId query_id) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
on_error_impl(it->second, Status::Error(-1, "Canceled"));
}
void FileUploadManager::update_local_file_location(QueryId query_id, const LocalFileLocation &local) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_local_file_location, local);
}
void FileUploadManager::hangup() {
nodes_container_.for_each([](auto query_id, auto &node) { node.loader_.reset(); });
stop_flag_ = true;
loop();
}
void FileUploadManager::on_hash(string hash) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_hash(node->query_id_, std::move(hash));
}
}
void FileUploadManager::on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_partial_upload(node->query_id_, std::move(partial_remote), ready_size);
}
}
void FileUploadManager::on_ok_upload(FileType file_type, PartialRemoteFileLocation remote, int64 size) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_upload_ok(node->query_id_, file_type, std::move(remote), size);
}
close_node(node_id);
loop();
}
void FileUploadManager::on_ok_upload_full(FullRemoteFileLocation remote) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_upload_full_ok(node->query_id_, std::move(remote));
}
close_node(node_id);
loop();
}
void FileUploadManager::on_error(Status status) {
auto node_id = get_link_token();
on_error_impl(node_id, std::move(status));
}
void FileUploadManager::on_error_impl(NodeId node_id, Status status) {
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
status.ignore();
return;
}
if (!stop_flag_) {
callback_->on_error(node->query_id_, std::move(status));
}
close_node(node_id);
loop();
}
void FileUploadManager::hangup_shared() {
auto node_id = get_link_token();
on_error_impl(node_id, Status::Error(-1, "Canceled"));
}
void FileUploadManager::loop() {
if (stop_flag_ && nodes_container_.empty()) {
stop();
}
}
void FileUploadManager::close_node(NodeId node_id) {
auto node = nodes_container_.get(node_id);
CHECK(node);
query_id_to_node_id_.erase(node->query_id_);
nodes_container_.erase(node_id);
}
} // namespace td

View File

@ -0,0 +1,121 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/telegram/files/FileEncryptionKey.h"
#include "td/telegram/files/FileHashUploader.h"
#include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/FileType.h"
#include "td/telegram/files/FileUploader.h"
#include "td/telegram/files/ResourceManager.h"
#include "td/actor/actor.h"
#include "td/utils/common.h"
#include "td/utils/Container.h"
#include "td/utils/Status.h"
#include <map>
namespace td {
class FileUploadManager final : public Actor {
public:
using QueryId = uint64;
class Callback {
public:
virtual ~Callback();
virtual void on_partial_upload(QueryId query_id, PartialRemoteFileLocation partial_remote, int64 ready_size) = 0;
virtual void on_hash(QueryId query_id, string hash) = 0;
virtual void on_upload_ok(QueryId query_id, FileType file_type, PartialRemoteFileLocation remote, int64 size) = 0;
virtual void on_upload_full_ok(QueryId query_id, FullRemoteFileLocation remote) = 0;
virtual void on_error(QueryId query_id, Status status) = 0;
};
explicit FileUploadManager(unique_ptr<Callback> callback, ActorShared<> parent);
void upload(QueryId query_id, const LocalFileLocation &local_location, const RemoteFileLocation &remote_location,
int64 expected_size, const FileEncryptionKey &encryption_key, int8 priority, vector<int> bad_parts);
void upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size, int8 priority);
void update_priority(QueryId query_id, int8 priority);
void cancel(QueryId query_id);
void update_local_file_location(QueryId query_id, const LocalFileLocation &local);
private:
struct Node {
QueryId query_id_;
ActorOwn<FileLoaderActor> loader_;
ResourceState resource_state_;
};
using NodeId = uint64;
ActorOwn<ResourceManager> upload_resource_manager_;
Container<Node> nodes_container_;
unique_ptr<Callback> callback_;
ActorShared<> parent_;
std::map<QueryId, NodeId> query_id_to_node_id_;
bool stop_flag_ = false;
void start_up() final;
void loop() final;
void hangup() final;
void hangup_shared() final;
void close_node(NodeId node_id);
void on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size);
void on_hash(string hash);
void on_ok_upload(FileType file_type, PartialRemoteFileLocation remote, int64 size);
void on_ok_upload_full(FullRemoteFileLocation remote);
void on_error(Status status);
void on_error_impl(NodeId node_id, Status status);
class FileUploaderCallback final : public FileUploader::Callback {
public:
explicit FileUploaderCallback(ActorShared<FileUploadManager> actor_id) : actor_id_(std::move(actor_id)) {
}
private:
ActorShared<FileUploadManager> actor_id_;
void on_hash(string hash) final {
send_closure(actor_id_, &FileUploadManager::on_hash, std::move(hash));
}
void on_partial_upload(PartialRemoteFileLocation partial_remote, int64 ready_size) final {
send_closure(actor_id_, &FileUploadManager::on_partial_upload, std::move(partial_remote), ready_size);
}
void on_ok(FileType file_type, PartialRemoteFileLocation partial_remote, int64 size) final {
send_closure(std::move(actor_id_), &FileUploadManager::on_ok_upload, file_type, std::move(partial_remote), size);
}
void on_error(Status status) final {
send_closure(std::move(actor_id_), &FileUploadManager::on_error, std::move(status));
}
};
class FileHashUploaderCallback final : public FileHashUploader::Callback {
public:
explicit FileHashUploaderCallback(ActorShared<FileUploadManager> actor_id) : actor_id_(std::move(actor_id)) {
}
private:
ActorShared<FileUploadManager> actor_id_;
void on_ok(FullRemoteFileLocation remote) final {
send_closure(std::move(actor_id_), &FileUploadManager::on_ok_upload_full, std::move(remote));
}
void on_error(Status status) final {
send_closure(std::move(actor_id_), &FileUploadManager::on_error, std::move(status));
}
};
};
} // namespace td