Move out FileDownloadManager from FileLoadManager.

This commit is contained in:
levlam 2024-07-12 18:15:44 +03:00
parent 30e4e8cecb
commit fd822a6372
7 changed files with 372 additions and 331 deletions

View File

@ -401,6 +401,7 @@ set(TDLIB_SOURCE_PART1
td/telegram/files/FileBitmask.cpp td/telegram/files/FileBitmask.cpp
td/telegram/files/FileDb.cpp td/telegram/files/FileDb.cpp
td/telegram/files/FileDownloader.cpp td/telegram/files/FileDownloader.cpp
td/telegram/files/FileDownloadManager.cpp
td/telegram/files/FileEncryptionKey.cpp td/telegram/files/FileEncryptionKey.cpp
td/telegram/files/FileFromBytes.cpp td/telegram/files/FileFromBytes.cpp
td/telegram/files/FileGcParameters.cpp td/telegram/files/FileGcParameters.cpp
@ -697,6 +698,7 @@ set(TDLIB_SOURCE_PART2
td/telegram/files/FileDb.h td/telegram/files/FileDb.h
td/telegram/files/FileDbId.h td/telegram/files/FileDbId.h
td/telegram/files/FileDownloader.h td/telegram/files/FileDownloader.h
td/telegram/files/FileDownloadManager.h
td/telegram/files/FileEncryptionKey.h td/telegram/files/FileEncryptionKey.h
td/telegram/files/FileFromBytes.h td/telegram/files/FileFromBytes.h
td/telegram/files/FileGcParameters.h td/telegram/files/FileGcParameters.h

View File

@ -0,0 +1,197 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/files/FileDownloadManager.h"
#include "td/telegram/Global.h"
#include "td/telegram/net/DcId.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/SliceBuilder.h"
namespace td {
FileDownloadManager::Callback::~Callback() = default;
FileDownloadManager::FileDownloadManager(unique_ptr<Callback> callback, ActorShared<> parent)
: callback_(std::move(callback)), parent_(std::move(parent)) {
}
void FileDownloadManager::start_up() {
if (G()->get_option_boolean("is_premium")) {
max_download_resource_limit_ *= 8;
}
}
ActorOwn<ResourceManager> &FileDownloadManager::get_download_resource_manager(bool is_small, DcId dc_id) {
auto &actor = is_small ? download_small_resource_manager_map_[dc_id] : download_resource_manager_map_[dc_id];
if (actor.empty()) {
actor = create_actor<ResourceManager>(
PSLICE() << "DownloadResourceManager " << tag("is_small", is_small) << tag("dc_id", dc_id),
max_download_resource_limit_, ResourceManager::Mode::Baseline);
}
return actor;
}
void FileDownloadManager::download(QueryId query_id, const FullRemoteFileLocation &remote_location,
const LocalFileLocation &local, int64 size, string name,
const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int64 limit,
int8 priority) {
if (stop_flag_) {
return;
}
NodeId node_id = nodes_container_.create(Node());
Node *node = nodes_container_.get(node_id);
CHECK(node);
node->query_id_ = query_id;
auto callback = make_unique<FileDownloaderCallback>(actor_shared(this, node_id));
bool is_small = size < 20 * 1024;
node->loader_ =
create_actor<FileDownloader>("Downloader", remote_location, local, size, std::move(name), encryption_key,
is_small, search_file, offset, limit, std::move(callback));
DcId dc_id = remote_location.is_web() ? G()->get_webfile_dc_id() : remote_location.get_dc_id();
auto &resource_manager = get_download_resource_manager(is_small, dc_id);
send_closure(resource_manager, &ResourceManager::register_worker,
ActorShared<FileLoaderActor>(node->loader_.get(), static_cast<uint64>(-1)), priority);
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
CHECK(is_inserted);
}
void FileDownloadManager::update_priority(QueryId query_id, int8 priority) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_priority, priority);
}
void FileDownloadManager::from_bytes(QueryId query_id, FileType type, BufferSlice bytes, string name) {
if (stop_flag_) {
return;
}
NodeId node_id = nodes_container_.create(Node());
Node *node = nodes_container_.get(node_id);
CHECK(node);
node->query_id_ = query_id;
auto callback = make_unique<FileFromBytesCallback>(actor_shared(this, node_id));
node->loader_ =
create_actor<FileFromBytes>("FromBytes", type, std::move(bytes), std::move(name), std::move(callback));
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
CHECK(is_inserted);
}
void FileDownloadManager::cancel(QueryId query_id) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
on_error_impl(it->second, Status::Error(-1, "Canceled"));
}
void FileDownloadManager::update_downloaded_part(QueryId query_id, int64 offset, int64 limit) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_downloaded_part, offset, limit, max_download_resource_limit_);
}
void FileDownloadManager::hangup() {
nodes_container_.for_each([](auto query_id, auto &node) { node.loader_.reset(); });
stop_flag_ = true;
loop();
}
void FileDownloadManager::on_start_download() {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_start_download(node->query_id_);
}
}
void FileDownloadManager::on_partial_download(PartialLocalFileLocation partial_local, int64 ready_size, int64 size) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_partial_download(node->query_id_, std::move(partial_local), ready_size, size);
}
}
void FileDownloadManager::on_ok_download(FullLocalFileLocation local, int64 size, bool is_new) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_download_ok(node->query_id_, std::move(local), size, is_new);
}
close_node(node_id);
loop();
}
void FileDownloadManager::on_error(Status status) {
auto node_id = get_link_token();
on_error_impl(node_id, std::move(status));
}
void FileDownloadManager::on_error_impl(NodeId node_id, Status status) {
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
status.ignore();
return;
}
if (!stop_flag_) {
callback_->on_error(node->query_id_, std::move(status));
}
close_node(node_id);
loop();
}
void FileDownloadManager::hangup_shared() {
auto node_id = get_link_token();
on_error_impl(node_id, Status::Error(-1, "Canceled"));
}
void FileDownloadManager::loop() {
if (stop_flag_ && nodes_container_.empty()) {
stop();
}
}
void FileDownloadManager::close_node(NodeId node_id) {
auto node = nodes_container_.get(node_id);
CHECK(node);
query_id_to_node_id_.erase(node->query_id_);
nodes_container_.erase(node_id);
}
} // namespace td

View File

@ -0,0 +1,127 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/telegram/files/FileDownloader.h"
#include "td/telegram/files/FileEncryptionKey.h"
#include "td/telegram/files/FileFromBytes.h"
#include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/FileType.h"
#include "td/telegram/files/ResourceManager.h"
#include "td/telegram/net/DcId.h"
#include "td/actor/actor.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/Container.h"
#include "td/utils/Promise.h"
#include "td/utils/Status.h"
#include <map>
namespace td {
class FileDownloadManager final : public Actor {
public:
using QueryId = uint64;
class Callback {
public:
virtual ~Callback();
virtual void on_start_download(QueryId query_id) = 0;
virtual void on_partial_download(QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size,
int64 size) = 0;
virtual void on_download_ok(QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) = 0;
virtual void on_error(QueryId query_id, Status status) = 0;
};
explicit FileDownloadManager(unique_ptr<Callback> callback, ActorShared<> parent);
void download(QueryId query_id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local,
int64 size, string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset,
int64 limit, int8 priority);
void update_priority(QueryId query_id, int8 priority);
void from_bytes(QueryId query_id, FileType type, BufferSlice bytes, string name);
void cancel(QueryId query_id);
void update_downloaded_part(QueryId query_id, int64 offset, int64 limit);
private:
struct Node {
QueryId query_id_;
ActorOwn<FileLoaderActor> loader_;
ResourceState resource_state_;
};
using NodeId = uint64;
std::map<DcId, ActorOwn<ResourceManager>> download_resource_manager_map_;
std::map<DcId, ActorOwn<ResourceManager>> download_small_resource_manager_map_;
Container<Node> nodes_container_;
unique_ptr<Callback> callback_;
ActorShared<> parent_;
std::map<QueryId, NodeId> query_id_to_node_id_;
int64 max_download_resource_limit_ = 1 << 21;
bool stop_flag_ = false;
void start_up() final;
void loop() final;
void hangup() final;
void hangup_shared() final;
void close_node(NodeId node_id);
ActorOwn<ResourceManager> &get_download_resource_manager(bool is_small, DcId dc_id);
void on_start_download();
void on_partial_download(PartialLocalFileLocation partial_local, int64 ready_size, int64 size);
void on_ok_download(FullLocalFileLocation local, int64 size, bool is_new);
void on_error(Status status);
void on_error_impl(NodeId node_id, Status status);
class FileDownloaderCallback final : public FileDownloader::Callback {
public:
explicit FileDownloaderCallback(ActorShared<FileDownloadManager> actor_id) : actor_id_(std::move(actor_id)) {
}
private:
ActorShared<FileDownloadManager> actor_id_;
void on_start_download() final {
send_closure(actor_id_, &FileDownloadManager::on_start_download);
}
void on_partial_download(PartialLocalFileLocation partial_local, int64 ready_size, int64 size) final {
send_closure(actor_id_, &FileDownloadManager::on_partial_download, std::move(partial_local), ready_size, size);
}
void on_ok(FullLocalFileLocation full_local, int64 size, bool is_new) final {
send_closure(std::move(actor_id_), &FileDownloadManager::on_ok_download, std::move(full_local), size, is_new);
}
void on_error(Status status) final {
send_closure(std::move(actor_id_), &FileDownloadManager::on_error, std::move(status));
}
};
class FileFromBytesCallback final : public FileFromBytes::Callback {
public:
explicit FileFromBytesCallback(ActorShared<FileDownloadManager> actor_id) : actor_id_(std::move(actor_id)) {
}
private:
ActorShared<FileDownloadManager> actor_id_;
void on_ok(const FullLocalFileLocation &full_local, int64 size) final {
send_closure(std::move(actor_id_), &FileDownloadManager::on_ok_download, full_local, size, true);
}
void on_error(Status status) final {
send_closure(std::move(actor_id_), &FileDownloadManager::on_error, std::move(status));
}
};
};
} // namespace td

View File

@ -6,93 +6,11 @@
// //
#include "td/telegram/files/FileLoadManager.h" #include "td/telegram/files/FileLoadManager.h"
#include "td/telegram/Global.h"
#include "td/telegram/net/DcId.h"
#include "td/utils/common.h"
#include "td/utils/filesystem.h" #include "td/utils/filesystem.h"
#include "td/utils/format.h"
#include "td/utils/port/path.h" #include "td/utils/port/path.h"
#include "td/utils/SliceBuilder.h"
namespace td { namespace td {
FileLoadManager::Callback::~Callback() = default;
FileLoadManager::FileLoadManager(unique_ptr<Callback> callback, ActorShared<> parent)
: callback_(std::move(callback)), parent_(std::move(parent)) {
}
void FileLoadManager::start_up() {
if (G()->get_option_boolean("is_premium")) {
max_download_resource_limit_ *= 8;
}
}
ActorOwn<ResourceManager> &FileLoadManager::get_download_resource_manager(bool is_small, DcId dc_id) {
auto &actor = is_small ? download_small_resource_manager_map_[dc_id] : download_resource_manager_map_[dc_id];
if (actor.empty()) {
actor = create_actor<ResourceManager>(
PSLICE() << "DownloadResourceManager " << tag("is_small", is_small) << tag("dc_id", dc_id),
max_download_resource_limit_, ResourceManager::Mode::Baseline);
}
return actor;
}
void FileLoadManager::download(QueryId query_id, const FullRemoteFileLocation &remote_location,
const LocalFileLocation &local, int64 size, string name,
const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int64 limit,
int8 priority) {
if (stop_flag_) {
return;
}
NodeId node_id = nodes_container_.create(Node());
Node *node = nodes_container_.get(node_id);
CHECK(node);
node->query_id_ = query_id;
auto callback = make_unique<FileDownloaderCallback>(actor_shared(this, node_id));
bool is_small = size < 20 * 1024;
node->loader_ =
create_actor<FileDownloader>("Downloader", remote_location, local, size, std::move(name), encryption_key,
is_small, search_file, offset, limit, std::move(callback));
DcId dc_id = remote_location.is_web() ? G()->get_webfile_dc_id() : remote_location.get_dc_id();
auto &resource_manager = get_download_resource_manager(is_small, dc_id);
send_closure(resource_manager, &ResourceManager::register_worker,
ActorShared<FileLoaderActor>(node->loader_.get(), static_cast<uint64>(-1)), priority);
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
CHECK(is_inserted);
}
void FileLoadManager::update_priority(QueryId query_id, int8 priority) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_priority, priority);
}
void FileLoadManager::from_bytes(QueryId query_id, FileType type, BufferSlice bytes, string name) {
if (stop_flag_) {
return;
}
NodeId node_id = nodes_container_.create(Node());
Node *node = nodes_container_.get(node_id);
CHECK(node);
node->query_id_ = query_id;
auto callback = make_unique<FileFromBytesCallback>(actor_shared(this, node_id));
node->loader_ =
create_actor<FileFromBytes>("FromBytes", type, std::move(bytes), std::move(name), std::move(callback));
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
CHECK(is_inserted);
}
void FileLoadManager::get_content(string file_path, Promise<BufferSlice> promise) { void FileLoadManager::get_content(string file_path, Promise<BufferSlice> promise) {
promise.set_result(read_file(file_path)); promise.set_result(read_file(file_path));
} }
@ -120,107 +38,4 @@ void FileLoadManager::check_partial_local_location(PartialLocalFileLocation part
} }
} }
void FileLoadManager::cancel(QueryId query_id) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
on_error_impl(it->second, Status::Error(-1, "Canceled"));
}
void FileLoadManager::update_downloaded_part(QueryId query_id, int64 offset, int64 limit) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(query_id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_downloaded_part, offset, limit, max_download_resource_limit_);
}
void FileLoadManager::hangup() {
nodes_container_.for_each([](auto query_id, auto &node) { node.loader_.reset(); });
stop_flag_ = true;
loop();
}
void FileLoadManager::on_start_download() {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_start_download(node->query_id_);
}
}
void FileLoadManager::on_partial_download(PartialLocalFileLocation partial_local, int64 ready_size, int64 size) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_partial_download(node->query_id_, std::move(partial_local), ready_size, size);
}
}
void FileLoadManager::on_ok_download(FullLocalFileLocation local, int64 size, bool is_new) {
auto node_id = get_link_token();
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
return;
}
if (!stop_flag_) {
callback_->on_download_ok(node->query_id_, std::move(local), size, is_new);
}
close_node(node_id);
loop();
}
void FileLoadManager::on_error(Status status) {
auto node_id = get_link_token();
on_error_impl(node_id, std::move(status));
}
void FileLoadManager::on_error_impl(NodeId node_id, Status status) {
auto node = nodes_container_.get(node_id);
if (node == nullptr) {
status.ignore();
return;
}
if (!stop_flag_) {
callback_->on_error(node->query_id_, std::move(status));
}
close_node(node_id);
loop();
}
void FileLoadManager::hangup_shared() {
auto node_id = get_link_token();
on_error_impl(node_id, Status::Error(-1, "Canceled"));
}
void FileLoadManager::loop() {
if (stop_flag_ && nodes_container_.empty()) {
stop();
}
}
void FileLoadManager::close_node(NodeId node_id) {
auto node = nodes_container_.get(node_id);
CHECK(node);
query_id_to_node_id_.erase(node->query_id_);
nodes_container_.erase(node_id);
}
} // namespace td } // namespace td

View File

@ -6,54 +6,19 @@
// //
#pragma once #pragma once
#include "td/telegram/files/FileDownloader.h"
#include "td/telegram/files/FileEncryptionKey.h"
#include "td/telegram/files/FileFromBytes.h"
#include "td/telegram/files/FileLoaderUtils.h" #include "td/telegram/files/FileLoaderUtils.h"
#include "td/telegram/files/FileLocation.h" #include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/FileType.h"
#include "td/telegram/files/ResourceManager.h"
#include "td/telegram/net/DcId.h"
#include "td/actor/actor.h" #include "td/actor/actor.h"
#include "td/utils/buffer.h" #include "td/utils/buffer.h"
#include "td/utils/common.h" #include "td/utils/common.h"
#include "td/utils/Container.h"
#include "td/utils/Promise.h" #include "td/utils/Promise.h"
#include "td/utils/Status.h"
#include <map>
namespace td { namespace td {
class FileLoadManager final : public Actor { class FileLoadManager final : public Actor {
public: public:
using QueryId = uint64;
class Callback {
public:
virtual ~Callback();
virtual void on_start_download(QueryId query_id) = 0;
virtual void on_partial_download(QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size,
int64 size) = 0;
virtual void on_download_ok(QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) = 0;
virtual void on_error(QueryId query_id, Status status) = 0;
};
explicit FileLoadManager(unique_ptr<Callback> callback, ActorShared<> parent);
void download(QueryId query_id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local,
int64 size, string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset,
int64 limit, int8 priority);
void update_priority(QueryId query_id, int8 priority);
void from_bytes(QueryId query_id, FileType type, BufferSlice bytes, string name);
void cancel(QueryId query_id);
void update_downloaded_part(QueryId query_id, int64 offset, int64 limit);
void get_content(string file_path, Promise<BufferSlice> promise); void get_content(string file_path, Promise<BufferSlice> promise);
void read_file_part(string file_path, int64 offset, int64 count, Promise<string> promise); void read_file_part(string file_path, int64 offset, int64 count, Promise<string> promise);
@ -64,76 +29,6 @@ class FileLoadManager final : public Actor {
Promise<FullLocalLocationInfo> promise); Promise<FullLocalLocationInfo> promise);
void check_partial_local_location(PartialLocalFileLocation partial, Promise<Unit> promise); void check_partial_local_location(PartialLocalFileLocation partial, Promise<Unit> promise);
private:
struct Node {
QueryId query_id_;
ActorOwn<FileLoaderActor> loader_;
ResourceState resource_state_;
};
using NodeId = uint64;
std::map<DcId, ActorOwn<ResourceManager>> download_resource_manager_map_;
std::map<DcId, ActorOwn<ResourceManager>> download_small_resource_manager_map_;
Container<Node> nodes_container_;
unique_ptr<Callback> callback_;
ActorShared<> parent_;
std::map<QueryId, NodeId> query_id_to_node_id_;
int64 max_download_resource_limit_ = 1 << 21;
bool stop_flag_ = false;
void start_up() final;
void loop() final;
void hangup() final;
void hangup_shared() final;
void close_node(NodeId node_id);
ActorOwn<ResourceManager> &get_download_resource_manager(bool is_small, DcId dc_id);
void on_start_download();
void on_partial_download(PartialLocalFileLocation partial_local, int64 ready_size, int64 size);
void on_ok_download(FullLocalFileLocation local, int64 size, bool is_new);
void on_error(Status status);
void on_error_impl(NodeId node_id, Status status);
class FileDownloaderCallback final : public FileDownloader::Callback {
public:
explicit FileDownloaderCallback(ActorShared<FileLoadManager> actor_id) : actor_id_(std::move(actor_id)) {
}
private:
ActorShared<FileLoadManager> actor_id_;
void on_start_download() final {
send_closure(actor_id_, &FileLoadManager::on_start_download);
}
void on_partial_download(PartialLocalFileLocation partial_local, int64 ready_size, int64 size) final {
send_closure(actor_id_, &FileLoadManager::on_partial_download, std::move(partial_local), ready_size, size);
}
void on_ok(FullLocalFileLocation full_local, int64 size, bool is_new) final {
send_closure(std::move(actor_id_), &FileLoadManager::on_ok_download, std::move(full_local), size, is_new);
}
void on_error(Status status) final {
send_closure(std::move(actor_id_), &FileLoadManager::on_error, std::move(status));
}
};
class FileFromBytesCallback final : public FileFromBytes::Callback {
public:
explicit FileFromBytesCallback(ActorShared<FileLoadManager> actor_id) : actor_id_(std::move(actor_id)) {
}
private:
ActorShared<FileLoadManager> actor_id_;
void on_ok(const FullLocalFileLocation &full_local, int64 size) final {
send_closure(std::move(actor_id_), &FileLoadManager::on_ok_download, full_local, size, true);
}
void on_error(Status status) final {
send_closure(std::move(actor_id_), &FileLoadManager::on_error, std::move(status));
}
};
}; };
} // namespace td } // namespace td

View File

@ -886,9 +886,10 @@ FileManager::FileManager(unique_ptr<Context> context) : context_(std::move(conte
} }
void FileManager::init_actor() { void FileManager::init_actor() {
file_load_manager_ = create_actor_on_scheduler<FileLoadManager>("FileLoadManager", G()->get_slow_net_scheduler_id(), file_download_manager_ = create_actor_on_scheduler<FileDownloadManager>(
make_unique<FileLoadManagerCallback>(actor_id(this)), "FileDownloadManager", G()->get_slow_net_scheduler_id(), make_unique<FileDownloadManagerCallback>(actor_id(this)),
context_->create_reference()); context_->create_reference());
file_load_manager_ = create_actor_on_scheduler<FileLoadManager>("FileLoadManager", G()->get_slow_net_scheduler_id());
file_upload_manager_ = create_actor_on_scheduler<FileUploadManager>( file_upload_manager_ = create_actor_on_scheduler<FileUploadManager>(
"FileUploadManager", G()->get_slow_net_scheduler_id(), make_unique<FileUploadManagerCallback>(actor_id(this)), "FileUploadManager", G()->get_slow_net_scheduler_id(), make_unique<FileUploadManagerCallback>(actor_id(this)),
context_->create_reference()); context_->create_reference());
@ -1600,7 +1601,7 @@ void FileManager::do_cancel_download(FileNodePtr node) {
if (node->download_id_ == 0) { if (node->download_id_ == 0) {
return; return;
} }
send_closure(file_load_manager_, &FileLoadManager::cancel, node->download_id_); send_closure(file_download_manager_, &FileDownloadManager::cancel, node->download_id_);
node->download_id_ = 0; node->download_id_ = 0;
node->is_download_started_ = false; node->is_download_started_ = false;
node->download_was_update_file_reference_ = false; node->download_was_update_file_reference_ = false;
@ -2246,11 +2247,11 @@ bool FileManager::set_content(FileId file_id, BufferSlice bytes) {
node->set_download_priority(FROM_BYTES_PRIORITY); node->set_download_priority(FROM_BYTES_PRIORITY);
FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::SetContent}); FileDownloadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::SetContent});
node->download_id_ = query_id; node->download_id_ = query_id;
node->is_download_started_ = true; node->is_download_started_ = true;
send_closure(file_load_manager_, &FileLoadManager::from_bytes, query_id, node->remote_.full.value().file_type_, send_closure(file_download_manager_, &FileDownloadManager::from_bytes, query_id,
std::move(bytes), node->suggested_path()); node->remote_.full.value().file_type_, std::move(bytes), node->suggested_path());
return true; return true;
} }
@ -2532,7 +2533,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
LOG(INFO) << "Update download offset and limits of file " << node->main_file_id_; LOG(INFO) << "Update download offset and limits of file " << node->main_file_id_;
CHECK(node->download_id_ != 0); CHECK(node->download_id_ != 0);
if (force_update_priority || priority != old_priority) { if (force_update_priority || priority != old_priority) {
send_closure(file_load_manager_, &FileLoadManager::update_priority, node->download_id_, priority); send_closure(file_download_manager_, &FileDownloadManager::update_priority, node->download_id_, priority);
} }
if (need_update_limit || need_update_offset) { if (need_update_limit || need_update_offset) {
auto download_offset = node->download_offset_; auto download_offset = node->download_offset_;
@ -2543,8 +2544,8 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
download_limit += download_offset; download_limit += download_offset;
download_offset = 0; download_offset = 0;
} }
send_closure(file_load_manager_, &FileLoadManager::update_downloaded_part, node->download_id_, download_offset, send_closure(file_download_manager_, &FileDownloadManager::update_downloaded_part, node->download_id_,
download_limit); download_offset, download_limit);
} }
return; return;
} }
@ -2555,7 +2556,8 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
if (node->need_reload_photo_ && file_view.may_reload_photo()) { if (node->need_reload_photo_ && file_view.may_reload_photo()) {
LOG(INFO) << "Reload photo from file " << node->main_file_id_; LOG(INFO) << "Reload photo from file " << node->main_file_id_;
FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::DownloadReloadDialog}); FileDownloadManager::QueryId query_id =
queries_container_.create(Query{file_id, Query::Type::DownloadReloadDialog});
node->download_id_ = query_id; node->download_id_ = query_id;
context_->reload_photo(file_view.remote_location().get_source(), context_->reload_photo(file_view.remote_location().get_source(),
PromiseCreator::lambda([actor_id = actor_id(this), query_id, file_id](Result<Unit> res) { PromiseCreator::lambda([actor_id = actor_id(this), query_id, file_id](Result<Unit> res) {
@ -2576,7 +2578,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
// If file reference is needed // If file reference is needed
if (!file_view.has_active_download_remote_location()) { if (!file_view.has_active_download_remote_location()) {
VLOG(file_references) << "Do not have valid file_reference for file " << file_id; VLOG(file_references) << "Do not have valid file_reference for file " << file_id;
FileLoadManager::QueryId query_id = FileDownloadManager::QueryId query_id =
queries_container_.create(Query{file_id, Query::Type::DownloadWaitFileReference}); queries_container_.create(Query{file_id, Query::Type::DownloadWaitFileReference});
node->download_id_ = query_id; node->download_id_ = query_id;
if (node->download_was_update_file_reference_) { if (node->download_was_update_file_reference_) {
@ -2598,7 +2600,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
return; return;
} }
FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Download}); FileDownloadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Download});
node->download_id_ = query_id; node->download_id_ = query_id;
node->is_download_started_ = false; node->is_download_started_ = false;
LOG(INFO) << "Run download of file " << file_id << " of size " << node->size_ << " from " LOG(INFO) << "Run download of file " << file_id << " of size " << node->size_ << " from "
@ -2612,9 +2614,9 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
download_limit += download_offset; download_limit += download_offset;
download_offset = 0; download_offset = 0;
} }
send_closure(file_load_manager_, &FileLoadManager::download, query_id, node->remote_.full.value(), node->local_, send_closure(file_download_manager_, &FileDownloadManager::download, query_id, node->remote_.full.value(),
node->size_, node->suggested_path(), node->encryption_key_, node->can_search_locally_, download_offset, node->local_, node->size_, node->suggested_path(), node->encryption_key_, node->can_search_locally_,
download_limit, priority); download_offset, download_limit, priority);
} }
class FileManager::ForceUploadActor final : public Actor { class FileManager::ForceUploadActor final : public Actor {
@ -2991,7 +2993,7 @@ void FileManager::run_generate(FileNodePtr node) {
return; return;
} }
FileLoadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Generate}); FileDownloadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Generate});
node->generate_id_ = query_id; node->generate_id_ = query_id;
send_closure(file_generate_manager_, &FileGenerateManager::generate_file, query_id, *node->generate_, node->local_, send_closure(file_generate_manager_, &FileGenerateManager::generate_file, query_id, *node->generate_, node->local_,
node->suggested_path(), [file_manager = this, query_id] { node->suggested_path(), [file_manager = this, query_id] {
@ -3000,7 +3002,7 @@ void FileManager::run_generate(FileNodePtr node) {
uint64 query_id_; uint64 query_id_;
public: public:
Callback(ActorId<FileManager> actor, FileLoadManager::QueryId query_id) Callback(ActorId<FileManager> actor, FileDownloadManager::QueryId query_id)
: actor_(std::move(actor)), query_id_(query_id) { : actor_(std::move(actor)), query_id_(query_id) {
} }
void on_partial_generate(PartialLocalFileLocation partial_local, int64 expected_size) final { void on_partial_generate(PartialLocalFileLocation partial_local, int64 expected_size) final {
@ -3744,7 +3746,7 @@ FileManager::FileNodeId FileManager::next_file_node_id() {
return res; return res;
} }
void FileManager::on_start_download(FileLoadManager::QueryId query_id) { void FileManager::on_start_download(FileDownloadManager::QueryId query_id) {
if (is_closed_) { if (is_closed_) {
return; return;
} }
@ -3766,7 +3768,7 @@ void FileManager::on_start_download(FileLoadManager::QueryId query_id) {
file_node->is_download_started_ = true; file_node->is_download_started_ = true;
} }
void FileManager::on_partial_download(FileLoadManager::QueryId query_id, PartialLocalFileLocation partial_local, void FileManager::on_partial_download(FileDownloadManager::QueryId query_id, PartialLocalFileLocation partial_local,
int64 ready_size, int64 size) { int64 ready_size, int64 size) {
if (is_closed_) { if (is_closed_) {
return; return;
@ -3796,7 +3798,7 @@ void FileManager::on_partial_download(FileLoadManager::QueryId query_id, Partial
try_flush_node(file_node, "on_partial_download"); try_flush_node(file_node, "on_partial_download");
} }
void FileManager::on_hash(FileLoadManager::QueryId query_id, string hash) { void FileManager::on_hash(FileUploadManager::QueryId query_id, string hash) {
if (is_closed_) { if (is_closed_) {
return; return;
} }
@ -3845,7 +3847,7 @@ void FileManager::on_partial_upload(FileUploadManager::QueryId query_id, Partial
try_flush_node(file_node, "on_partial_upload"); try_flush_node(file_node, "on_partial_upload");
} }
void FileManager::on_download_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local, int64 size, void FileManager::on_download_ok(FileDownloadManager::QueryId query_id, FullLocalFileLocation local, int64 size,
bool is_new) { bool is_new) {
if (is_closed_) { if (is_closed_) {
return; return;
@ -3961,7 +3963,7 @@ void FileManager::on_upload_full_ok(FileUploadManager::QueryId query_id, FullRem
LOG_STATUS(merge(new_file_id, file_id)); LOG_STATUS(merge(new_file_id, file_id));
} }
void FileManager::on_partial_generate(FileLoadManager::QueryId query_id, PartialLocalFileLocation partial_local, void FileManager::on_partial_generate(FileDownloadManager::QueryId query_id, PartialLocalFileLocation partial_local,
int64 expected_size) { int64 expected_size) {
if (is_closed_) { if (is_closed_) {
return; return;
@ -3998,7 +4000,7 @@ void FileManager::on_partial_generate(FileLoadManager::QueryId query_id, Partial
try_flush_node(file_node, "on_partial_generate"); try_flush_node(file_node, "on_partial_generate");
} }
void FileManager::on_generate_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local) { void FileManager::on_generate_ok(FileDownloadManager::QueryId query_id, FullLocalFileLocation local) {
if (is_closed_) { if (is_closed_) {
return; return;
} }
@ -4042,7 +4044,7 @@ void FileManager::on_generate_ok(FileLoadManager::QueryId query_id, FullLocalFil
} }
} }
void FileManager::on_download_error(FileLoadManager::QueryId query_id, Status status) { void FileManager::on_download_error(FileDownloadManager::QueryId query_id, Status status) {
if (is_closed_) { if (is_closed_) {
return; return;
} }
@ -4283,7 +4285,7 @@ Result<string> FileManager::get_suggested_file_name(FileId file_id, const string
void FileManager::hangup() { void FileManager::hangup() {
file_db_.reset(); file_db_.reset();
file_generate_manager_.reset(); file_generate_manager_.reset();
file_load_manager_.reset(); file_download_manager_.reset();
file_upload_manager_.reset(); file_upload_manager_.reset();
while (!queries_container_.empty()) { while (!queries_container_.empty()) {
auto query_ids = queries_container_.ids(); auto query_ids = queries_container_.ids();

View File

@ -8,6 +8,7 @@
#include "td/telegram/DialogId.h" #include "td/telegram/DialogId.h"
#include "td/telegram/files/FileDbId.h" #include "td/telegram/files/FileDbId.h"
#include "td/telegram/files/FileDownloadManager.h"
#include "td/telegram/files/FileEncryptionKey.h" #include "td/telegram/files/FileEncryptionKey.h"
#include "td/telegram/files/FileGenerateManager.h" #include "td/telegram/files/FileGenerateManager.h"
#include "td/telegram/files/FileId.h" #include "td/telegram/files/FileId.h"
@ -143,10 +144,10 @@ class FileNode {
NewRemoteFileLocation remote_; NewRemoteFileLocation remote_;
FileLoadManager::QueryId download_id_ = 0; FileDownloadManager::QueryId download_id_ = 0;
unique_ptr<FullGenerateFileLocation> generate_; unique_ptr<FullGenerateFileLocation> generate_;
FileLoadManager::QueryId generate_id_ = 0; FileDownloadManager::QueryId generate_id_ = 0;
int64 size_ = 0; int64 size_ = 0;
int64 expected_size_ = 0; int64 expected_size_ = 0;
@ -560,28 +561,29 @@ class FileManager final : public Actor {
FileId parse_file(ParserT &parser); FileId parse_file(ParserT &parser);
private: private:
class FileLoadManagerCallback final : public FileLoadManager::Callback { class FileDownloadManagerCallback final : public FileDownloadManager::Callback {
public: public:
explicit FileLoadManagerCallback(ActorId<FileManager> actor_id) : actor_id_(std::move(actor_id)) { explicit FileDownloadManagerCallback(ActorId<FileManager> actor_id) : actor_id_(std::move(actor_id)) {
} }
private: private:
ActorId<FileManager> actor_id_; ActorId<FileManager> actor_id_;
void on_start_download(FileLoadManager::QueryId query_id) final { void on_start_download(FileDownloadManager::QueryId query_id) final {
send_closure(actor_id_, &FileManager::on_start_download, query_id); send_closure(actor_id_, &FileManager::on_start_download, query_id);
} }
void on_partial_download(FileLoadManager::QueryId query_id, PartialLocalFileLocation partial_local, void on_partial_download(FileDownloadManager::QueryId query_id, PartialLocalFileLocation partial_local,
int64 ready_size, int64 size) final { int64 ready_size, int64 size) final {
send_closure(actor_id_, &FileManager::on_partial_download, query_id, std::move(partial_local), ready_size, size); send_closure(actor_id_, &FileManager::on_partial_download, query_id, std::move(partial_local), ready_size, size);
} }
void on_download_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) final { void on_download_ok(FileDownloadManager::QueryId query_id, FullLocalFileLocation local, int64 size,
bool is_new) final {
send_closure(actor_id_, &FileManager::on_download_ok, query_id, std::move(local), size, is_new); send_closure(actor_id_, &FileManager::on_download_ok, query_id, std::move(local), size, is_new);
} }
void on_error(FileLoadManager::QueryId query_id, Status status) final { void on_error(FileDownloadManager::QueryId query_id, Status status) final {
send_closure(actor_id_, &FileManager::on_download_error, query_id, std::move(status)); send_closure(actor_id_, &FileManager::on_download_error, query_id, std::move(status));
} }
}; };
@ -692,6 +694,7 @@ class FileManager final : public Actor {
WaitFreeVector<FileIdInfo> file_id_info_; WaitFreeVector<FileIdInfo> file_id_info_;
WaitFreeVector<int32> empty_file_ids_; WaitFreeVector<int32> empty_file_ids_;
WaitFreeVector<unique_ptr<FileNode>> file_nodes_; WaitFreeVector<unique_ptr<FileNode>> file_nodes_;
ActorOwn<FileDownloadManager> file_download_manager_;
ActorOwn<FileLoadManager> file_load_manager_; ActorOwn<FileLoadManager> file_load_manager_;
ActorOwn<FileUploadManager> file_upload_manager_; ActorOwn<FileUploadManager> file_upload_manager_;
ActorOwn<FileGenerateManager> file_generate_manager_; ActorOwn<FileGenerateManager> file_generate_manager_;
@ -767,11 +770,11 @@ class FileManager final : public Actor {
void run_download(FileNodePtr node, bool force_update_priority); void run_download(FileNodePtr node, bool force_update_priority);
void run_generate(FileNodePtr node); void run_generate(FileNodePtr node);
void on_start_download(FileLoadManager::QueryId query_id); void on_start_download(FileDownloadManager::QueryId query_id);
void on_partial_download(FileLoadManager::QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size, void on_partial_download(FileDownloadManager::QueryId query_id, PartialLocalFileLocation partial_local,
int64 size); int64 ready_size, int64 size);
void on_download_ok(FileLoadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new); void on_download_ok(FileDownloadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new);
void on_download_error(FileLoadManager::QueryId query_id, Status status); void on_download_error(FileDownloadManager::QueryId query_id, Status status);
void on_hash(FileUploadManager::QueryId query_id, string hash); void on_hash(FileUploadManager::QueryId query_id, string hash);
void on_partial_upload(FileUploadManager::QueryId query_id, PartialRemoteFileLocation partial_remote, void on_partial_upload(FileUploadManager::QueryId query_id, PartialRemoteFileLocation partial_remote,
@ -783,8 +786,8 @@ class FileManager final : public Actor {
void on_error_impl(FileNodePtr node, Query::Type type, bool was_active, Status status); void on_error_impl(FileNodePtr node, Query::Type type, bool was_active, Status status);
void on_partial_generate(FileLoadManager::QueryId, PartialLocalFileLocation partial_local, int64 expected_size); void on_partial_generate(FileDownloadManager::QueryId, PartialLocalFileLocation partial_local, int64 expected_size);
void on_generate_ok(FileLoadManager::QueryId, FullLocalFileLocation local); void on_generate_ok(FileDownloadManager::QueryId, FullLocalFileLocation local);
std::pair<Query, bool> finish_query(Container<Query>::Id query_id); std::pair<Query, bool> finish_query(Container<Query>::Id query_id);