Merge FileLoader and FileDownloader.

This commit is contained in:
levlam 2024-07-13 23:20:13 +03:00
parent 284fe8be53
commit cd33aa82cc
7 changed files with 415 additions and 532 deletions

View File

@ -408,7 +408,6 @@ set(TDLIB_SOURCE_PART1
td/telegram/files/FileGcWorker.cpp
td/telegram/files/FileGenerateManager.cpp
td/telegram/files/FileHashUploader.cpp
td/telegram/files/FileLoader.cpp
td/telegram/files/FileLoaderUtils.cpp
td/telegram/files/FileLoadManager.cpp
td/telegram/files/FileManager.cpp
@ -707,7 +706,6 @@ set(TDLIB_SOURCE_PART2
td/telegram/files/FileHashUploader.h
td/telegram/files/FileId.h
td/telegram/files/FileLoaderActor.h
td/telegram/files/FileLoader.h
td/telegram/files/FileLoaderUtils.h
td/telegram/files/FileLoadManager.h
td/telegram/files/FileLocation.h

View File

@ -6,11 +6,15 @@
//
#include "td/telegram/files/FileDownloadManager.h"
#include "td/telegram/files/FileLoaderUtils.h"
#include "td/telegram/Global.h"
#include "td/telegram/net/DcId.h"
#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/UniqueId.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/SliceBuilder.h"
namespace td {

View File

@ -10,7 +10,7 @@
#include "td/telegram/files/FileLoaderUtils.h"
#include "td/telegram/files/FileType.h"
#include "td/telegram/Global.h"
#include "td/telegram/net/DcId.h"
#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/SecureStorage.h"
#include "td/telegram/telegram_api.h"
#include "td/telegram/UniqueId.h"
@ -43,17 +43,15 @@ FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const Local
, callback_(std::move(callback))
, is_small_(is_small)
, need_search_file_(need_search_file)
, ordered_flag_(encryption_key_.is_secret())
, offset_(offset)
, limit_(limit) {
if (encryption_key.is_secret()) {
set_ordered_flag(true);
}
if (!encryption_key.empty()) {
CHECK(offset_ == 0);
}
}
Result<FileLoader::FileInfo> FileDownloader::init() {
Result<FileDownloader::FileInfo> FileDownloader::init() {
SCOPE_EXIT {
try_release_fd();
};
@ -122,28 +120,6 @@ Result<FileLoader::FileInfo> FileDownloader::init() {
return res;
}
Status FileDownloader::on_ok(int64 size) {
std::string path;
fd_.close();
if (encryption_key_.is_secure()) {
TRY_RESULT(file_path, open_temp_file(remote_.file_type_));
string tmp_path;
std::tie(std::ignore, tmp_path) = std::move(file_path);
TRY_STATUS(secure_storage::decrypt_file(encryption_key_.secret(), encryption_key_.value_hash(), path_, tmp_path));
unlink(path_).ignore();
path_ = std::move(tmp_path);
TRY_RESULT(path_stat, stat(path_));
size = path_stat.size_;
}
if (only_check_) {
path = path_;
} else {
TRY_RESULT_ASSIGN(path, create_from_temp(remote_.file_type_, path_, name_));
}
callback_->on_ok(FullLocalFileLocation(remote_.file_type_, std::move(path), 0), size, !only_check_);
return Status::OK();
}
void FileDownloader::on_error(Status status) {
fd_.close();
callback_->on_error(std::move(status));
@ -228,11 +204,11 @@ Result<NetQueryPtr> FileDownloader::start_part(Part part, int32 part_count, int6
}
// auto size = part.size;
//// sometimes we can ask more than server has, just to check size
// if (size < get_part_size()) {
// size = min(size + 16, get_part_size());
// if (size < parts_manager_.get_part_size()) {
// size = min(size + 16, parts_manager_.get_part_size());
// LOG(INFO) << "Ask " << size << " instead of " << part.size;
//}
auto size = get_part_size();
auto size = parts_manager_.get_part_size();
CHECK(part.size <= size);
callback_->on_start_download();
@ -377,28 +353,32 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) {
return written;
}
void FileDownloader::on_progress(Progress progress) {
if (progress.is_ready) {
void FileDownloader::on_progress() {
if (parts_manager_.ready()) {
// do not send partial location. will lead to wrong local_size
return;
}
if (progress.ready_size == 0 || path_.empty()) {
auto ready_size = parts_manager_.get_ready_size();
if (ready_size == 0 || path_.empty()) {
return;
}
auto part_size = static_cast<int32>(parts_manager_.get_part_size());
auto size = parts_manager_.get_size_or_zero();
if (encryption_key_.empty() || encryption_key_.is_secure()) {
callback_->on_partial_download(
PartialLocalFileLocation{remote_.file_type_, progress.part_size, path_, "", std::move(progress.ready_bitmask)},
progress.ready_size, progress.size);
PartialLocalFileLocation{remote_.file_type_, part_size, path_, "", parts_manager_.get_bitmask()}, ready_size,
size);
} else if (encryption_key_.is_secret()) {
UInt256 iv;
if (progress.ready_part_count == next_part_) {
auto ready_part_count = parts_manager_.get_ready_prefix_count();
if (ready_part_count == next_part_) {
iv = encryption_key_.mutable_iv();
} else {
LOG(FATAL) << tag("ready_part_count", progress.ready_part_count) << tag("next_part", next_part_);
LOG(FATAL) << tag("ready_part_count", ready_part_count) << tag("next_part", next_part_);
}
callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, progress.part_size, path_,
as_slice(iv).str(), std::move(progress.ready_bitmask)},
progress.ready_size, progress.size);
callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, part_size, path_, as_slice(iv).str(),
parts_manager_.get_bitmask()},
ready_size, size);
} else {
UNREACHABLE();
}
@ -412,15 +392,15 @@ Status FileDownloader::process_check_query(NetQueryPtr net_query) {
return Status::OK();
}
Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_size, int64 ready_prefix_size,
bool is_ready) {
Status FileDownloader::check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) {
if (!need_check_) {
return CheckInfo{};
return Status::OK();
}
SCOPE_EXIT {
try_release_fd();
};
CheckInfo info;
bool is_changed = false;
vector<NetQueryPtr> queries;
while (checked_prefix_size < ready_prefix_size) {
//LOG(ERROR) << "NEED TO CHECK: " << checked_prefix_size << "->" << ready_prefix_size - checked_prefix_size;
HashInfo search_info;
@ -457,7 +437,7 @@ Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_si
}
checked_prefix_size = end_offset;
info.changed = true;
is_changed = true;
continue;
}
if (!has_hash_query_) {
@ -465,15 +445,26 @@ Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_si
auto query = telegram_api::upload_getFileHashes(remote_.as_input_file_location(), checked_prefix_size);
auto net_query_type = is_small_ ? NetQuery::Type::DownloadSmall : NetQuery::Type::Download;
auto net_query = G()->net_query_creator().create(query, {}, remote_.get_dc_id(), net_query_type);
info.queries.push_back(std::move(net_query));
queries.push_back(std::move(net_query));
break;
}
// Should fail?
break;
}
info.need_check = need_check_;
info.checked_prefix_size = checked_prefix_size;
return std::move(info);
if (is_changed) {
on_progress();
}
for (auto &query : queries) {
G()->net_query_dispatcher().dispatch_with_callback(
std::move(query), actor_shared(this, UniqueId::next(UniqueId::Type::Default, COMMON_QUERY_KEY)));
}
if (need_check_) {
parts_manager_.set_need_check();
parts_manager_.set_checked_prefix_size(checked_prefix_size);
}
return Status::OK();
}
void FileDownloader::add_hash_info(const std::vector<telegram_api::object_ptr<telegram_api::fileHash>> &hashes) {
@ -487,11 +478,6 @@ void FileDownloader::add_hash_info(const std::vector<telegram_api::object_ptr<te
}
}
void FileDownloader::keep_fd_flag(bool keep_fd) {
keep_fd_ = keep_fd;
try_release_fd();
}
void FileDownloader::try_release_fd() {
if (!keep_fd_ && !fd_.empty()) {
fd_.close();
@ -509,4 +495,287 @@ Status FileDownloader::acquire_fd() {
return Status::OK();
}
void FileDownloader::set_resource_manager(ActorShared<ResourceManager> resource_manager) {
resource_manager_ = std::move(resource_manager);
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
}
void FileDownloader::update_priority(int8 priority) {
send_closure(resource_manager_, &ResourceManager::update_priority, priority);
}
void FileDownloader::update_resources(const ResourceState &other) {
resource_state_.update_slave(other);
VLOG(file_loader) << "Update resources " << resource_state_;
loop();
}
void FileDownloader::hangup() {
if (delay_dispatcher_.empty()) {
stop();
} else {
delay_dispatcher_.reset();
}
}
void FileDownloader::hangup_shared() {
if (get_link_token() == 1) {
stop();
}
}
void FileDownloader::update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) {
if (parts_manager_.get_streaming_offset() != offset) {
auto begin_part_id = parts_manager_.set_streaming_offset(offset, limit);
auto new_end_part_id = limit <= 0 ? parts_manager_.get_part_count()
: narrow_cast<int32>((offset + limit - 1) / parts_manager_.get_part_size()) + 1;
auto max_parts = narrow_cast<int32>(max_resource_limit / parts_manager_.get_part_size());
auto end_part_id = begin_part_id + td::min(max_parts, new_end_part_id - begin_part_id);
VLOG(file_loader) << "Protect parts " << begin_part_id << " ... " << end_part_id - 1;
for (auto &it : part_map_) {
if (!it.second.second.empty() && !(begin_part_id <= it.second.first.id && it.second.first.id < end_part_id)) {
VLOG(file_loader) << "Cancel part " << it.second.first.id;
it.second.second.reset(); // cancel_query(it.second.second);
}
}
} else {
parts_manager_.set_streaming_limit(limit);
}
update_estimated_limit();
loop();
}
void FileDownloader::start_up() {
auto r_file_info = init();
if (r_file_info.is_error()) {
on_error(r_file_info.move_as_error());
stop_flag_ = true;
return;
}
auto file_info = r_file_info.ok();
auto size = file_info.size;
auto expected_size = max(size, file_info.expected_size);
bool is_size_final = file_info.is_size_final;
auto part_size = file_info.part_size;
auto &ready_parts = file_info.ready_parts;
auto use_part_count_limit = file_info.use_part_count_limit;
auto status =
parts_manager_.init(size, expected_size, is_size_final, part_size, ready_parts, use_part_count_limit, false);
LOG(DEBUG) << "Start downloading a file of size " << size << " with expected "
<< (is_size_final ? "exact" : "approximate") << " size " << expected_size << ", part size " << part_size
<< " and " << ready_parts.size() << " ready parts: " << status;
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
return;
}
if (file_info.only_check) {
parts_manager_.set_checked_prefix_size(0);
}
parts_manager_.set_streaming_offset(file_info.offset, file_info.limit);
if (ordered_flag_) {
ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count());
}
if (file_info.need_delay) {
delay_dispatcher_ = create_actor<DelayDispatcher>("DelayDispatcher", 0.003, actor_shared(this, 1));
next_delay_ = 0.05;
}
resource_state_.set_unit_size(parts_manager_.get_part_size());
update_estimated_limit();
on_progress();
yield();
}
void FileDownloader::loop() {
if (stop_flag_) {
return;
}
auto status = do_loop();
if (status.is_error()) {
if (status.code() == -1) {
return;
}
on_error(std::move(status));
stop_flag_ = true;
return;
}
}
Status FileDownloader::do_loop() {
TRY_STATUS(check_loop(parts_manager_.get_checked_prefix_size(), parts_manager_.get_unchecked_ready_prefix_size(),
parts_manager_.unchecked_ready()));
if (parts_manager_.may_finish()) {
TRY_STATUS(parts_manager_.finish());
fd_.close();
auto size = parts_manager_.get_size();
if (encryption_key_.is_secure()) {
TRY_RESULT(file_path, open_temp_file(remote_.file_type_));
string tmp_path;
std::tie(std::ignore, tmp_path) = std::move(file_path);
TRY_STATUS(secure_storage::decrypt_file(encryption_key_.secret(), encryption_key_.value_hash(), path_, tmp_path));
unlink(path_).ignore();
path_ = std::move(tmp_path);
TRY_RESULT(path_stat, stat(path_));
size = path_stat.size_;
}
string path;
if (only_check_) {
path = path_;
} else {
TRY_RESULT_ASSIGN(path, create_from_temp(remote_.file_type_, path_, name_));
}
callback_->on_ok(FullLocalFileLocation(remote_.file_type_, std::move(path), 0), size, !only_check_);
LOG(INFO) << "Bad download order rate: "
<< (debug_total_parts_ == 0 ? 0.0 : 100.0 * debug_bad_part_order_ / debug_total_parts_) << "% "
<< debug_bad_part_order_ << "/" << debug_total_parts_ << " " << format::as_array(debug_bad_parts_);
stop_flag_ = true;
return Status::OK();
}
while (true) {
if (resource_state_.unused() < narrow_cast<int64>(parts_manager_.get_part_size())) {
VLOG(file_loader) << "Receive only " << resource_state_.unused() << " resource";
break;
}
TRY_RESULT(part, parts_manager_.start_part());
if (part.size == 0) {
break;
}
VLOG(file_loader) << "Start part " << tag("id", part.id) << tag("size", part.size);
resource_state_.start_use(static_cast<int64>(part.size));
TRY_RESULT(query, start_part(part, parts_manager_.get_part_count(), parts_manager_.get_streaming_offset()));
uint64 unique_id = UniqueId::next();
part_map_[unique_id] = std::make_pair(part, query->cancel_slot_.get_signal_new());
// part_map_[unique_id] = std::make_pair(part, query.get_weak());
auto callback = actor_shared(this, unique_id);
if (delay_dispatcher_.empty()) {
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback));
} else {
query->debug("sent to DelayDispatcher");
send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback_and_delay, std::move(query),
std::move(callback), next_delay_);
next_delay_ = max(next_delay_ * 0.8, 0.003);
}
}
return Status::OK();
}
void FileDownloader::tear_down() {
for (auto &it : part_map_) {
it.second.second.reset(); // cancel_query(it.second.second);
}
ordered_parts_.clear([](auto &&part) { part.second->clear(); });
if (!delay_dispatcher_.empty()) {
send_closure(std::move(delay_dispatcher_), &DelayDispatcher::close_silent);
}
}
void FileDownloader::update_estimated_limit() {
if (stop_flag_) {
return;
}
auto estimated_extra = parts_manager_.get_estimated_extra();
resource_state_.update_estimated_limit(estimated_extra);
VLOG(file_loader) << "Update estimated limit " << estimated_extra;
if (!resource_manager_.empty()) {
keep_fd_ = narrow_cast<uint64>(resource_state_.active_limit()) >= parts_manager_.get_part_size();
try_release_fd();
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
}
}
void FileDownloader::on_result(NetQueryPtr query) {
if (stop_flag_) {
return;
}
auto unique_id = get_link_token();
if (UniqueId::extract_key(unique_id) == COMMON_QUERY_KEY) {
auto status = process_check_query(std::move(query));
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
} else {
loop();
}
return;
}
auto it = part_map_.find(unique_id);
if (it == part_map_.end()) {
LOG(WARNING) << "Receive result for unknown part";
return;
}
Part part = it->second.first;
it->second.second.release();
CHECK(query->is_ready());
part_map_.erase(it);
bool next = false;
auto status = [&] {
TRY_RESULT(should_restart, should_restart_part(part, query));
if (query->is_error() && query->error().code() == NetQuery::Error::Canceled) {
should_restart = true;
}
if (should_restart) {
VLOG(file_loader) << "Restart part " << tag("id", part.id) << tag("size", part.size);
resource_state_.stop_use(static_cast<int64>(part.size));
parts_manager_.on_part_failed(part.id);
} else {
next = true;
}
return Status::OK();
}();
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
return;
}
if (next) {
if (ordered_flag_) {
auto seq_no = part.id;
ordered_parts_.add(
seq_no, std::make_pair(part, std::move(query)),
[this](uint64 seq_no, std::pair<Part, NetQueryPtr> &&p) { on_part_query(p.first, std::move(p.second)); });
} else {
on_part_query(part, std::move(query));
}
}
update_estimated_limit();
loop();
}
void FileDownloader::on_part_query(Part part, NetQueryPtr query) {
if (stop_flag_) {
// important for secret files
return;
}
auto status = try_on_part_query(part, std::move(query));
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
}
}
Status FileDownloader::try_on_part_query(Part part, NetQueryPtr query) {
TRY_RESULT(size, process_part(part, std::move(query)));
VLOG(file_loader) << "Ok part " << tag("id", part.id) << tag("size", part.size);
resource_state_.stop_use(static_cast<int64>(part.size));
auto old_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count();
TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size));
auto new_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count();
debug_total_parts_++;
if (old_ready_prefix_count == new_ready_prefix_count) {
debug_bad_parts_.push_back(part.id);
debug_bad_part_order_++;
}
on_progress();
return Status::OK();
}
} // namespace td

View File

@ -6,14 +6,21 @@
//
#pragma once
#include "td/telegram/DelayDispatcher.h"
#include "td/telegram/files/FileEncryptionKey.h"
#include "td/telegram/files/FileLoader.h"
#include "td/telegram/files/FileLoaderActor.h"
#include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/PartsManager.h"
#include "td/telegram/files/ResourceManager.h"
#include "td/telegram/files/ResourceState.h"
#include "td/telegram/net/DcId.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/telegram_api.h"
#include "td/actor/actor.h"
#include "td/utils/common.h"
#include "td/utils/OrderedEventsProcessor.h"
#include "td/utils/port/FileFd.h"
#include "td/utils/Status.h"
@ -23,7 +30,7 @@
namespace td {
class FileDownloader final : public FileLoader {
class FileDownloader final : public FileLoaderActor {
public:
class Callback {
public:
@ -58,10 +65,12 @@ class FileDownloader final : public FileLoader {
int32 next_part_ = 0;
bool next_part_stop_ = false;
bool is_small_;
bool need_search_file_{false};
int64 offset_;
int64 limit_;
bool is_small_ = false;
bool need_search_file_ = false;
bool ordered_flag_ = false;
bool keep_fd_ = false;
int64 offset_ = 0;
int64 limit_ = 0;
bool use_cdn_ = false;
DcId cdn_dc_id_;
@ -72,7 +81,7 @@ class FileDownloader final : public FileLoader {
std::map<int32, string> cdn_part_reupload_token_;
std::map<int32, int32> cdn_part_file_token_generation_;
bool need_check_{false};
bool need_check_ = false;
struct HashInfo {
int64 offset;
size_t size;
@ -84,23 +93,82 @@ class FileDownloader final : public FileLoader {
std::set<HashInfo> hash_info_;
bool has_hash_query_ = false;
Result<FileInfo> init() final TD_WARN_UNUSED_RESULT;
Status on_ok(int64 size) final TD_WARN_UNUSED_RESULT;
void on_error(Status status) final;
Result<bool> should_restart_part(Part part, const NetQueryPtr &net_query) final TD_WARN_UNUSED_RESULT;
Result<NetQueryPtr> start_part(Part part, int32 part_count, int64 streaming_offset) final TD_WARN_UNUSED_RESULT;
Result<size_t> process_part(Part part, NetQueryPtr net_query) final TD_WARN_UNUSED_RESULT;
void on_progress(Progress progress) final;
Status process_check_query(NetQueryPtr net_query) final;
Result<CheckInfo> check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) final;
static constexpr uint8 COMMON_QUERY_KEY = 2;
bool stop_flag_ = false;
ActorShared<ResourceManager> resource_manager_;
ResourceState resource_state_;
PartsManager parts_manager_;
std::map<uint64, std::pair<Part, ActorShared<>>> part_map_;
OrderedEventsProcessor<std::pair<Part, NetQueryPtr>> ordered_parts_;
ActorOwn<DelayDispatcher> delay_dispatcher_;
double next_delay_ = 0;
uint32 debug_total_parts_ = 0;
uint32 debug_bad_part_order_ = 0;
std::vector<int32> debug_bad_parts_;
void hangup() final;
void hangup_shared() final;
void on_error(Status status);
Result<bool> should_restart_part(Part part, const NetQueryPtr &net_query) TD_WARN_UNUSED_RESULT;
Status process_check_query(NetQueryPtr net_query);
Status check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready);
struct FileInfo {
int64 size{0};
int64 expected_size{0};
bool is_size_final{false};
int32 part_size{0};
std::vector<int> ready_parts;
bool use_part_count_limit{true};
bool only_check{false};
bool need_delay{false};
int64 offset{0};
int64 limit{0};
bool is_upload{false};
};
Result<FileInfo> init() TD_WARN_UNUSED_RESULT;
Result<NetQueryPtr> start_part(Part part, int32 part_count, int64 streaming_offset) TD_WARN_UNUSED_RESULT;
Result<size_t> process_part(Part part, NetQueryPtr net_query) TD_WARN_UNUSED_RESULT;
void add_hash_info(const std::vector<telegram_api::object_ptr<telegram_api::fileHash>> &hashes);
bool keep_fd_ = false;
void keep_fd_flag(bool keep_fd) final;
void try_release_fd();
Status acquire_fd() TD_WARN_UNUSED_RESULT;
Status check_net_query(NetQueryPtr &net_query);
void set_resource_manager(ActorShared<ResourceManager> resource_manager) final;
void update_priority(int8 priority) final;
void update_resources(const ResourceState &other) final;
void update_local_file_location(const LocalFileLocation &local) final {
}
void update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) final;
void start_up() final;
void loop() final;
Status do_loop();
void tear_down() final;
void update_estimated_limit();
void on_progress();
void on_result(NetQueryPtr query) final;
void on_part_query(Part part, NetQueryPtr query);
void on_common_query(NetQueryPtr query);
Status try_on_part_query(Part part, NetQueryPtr query);
};
} // namespace td

View File

@ -9,6 +9,7 @@
#include "td/telegram/files/FileLoaderActor.h"
#include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/FileType.h"
#include "td/telegram/files/ResourceManager.h"
#include "td/actor/actor.h"

View File

@ -1,329 +0,0 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/files/FileLoader.h"
#include "td/telegram/files/FileLoaderUtils.h"
#include "td/telegram/Global.h"
#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/UniqueId.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/ScopeGuard.h"
namespace td {
void FileLoader::set_resource_manager(ActorShared<ResourceManager> resource_manager) {
resource_manager_ = std::move(resource_manager);
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
}
void FileLoader::update_priority(int8 priority) {
send_closure(resource_manager_, &ResourceManager::update_priority, priority);
}
void FileLoader::update_resources(const ResourceState &other) {
resource_state_.update_slave(other);
VLOG(file_loader) << "Update resources " << resource_state_;
loop();
}
void FileLoader::set_ordered_flag(bool flag) {
ordered_flag_ = flag;
}
size_t FileLoader::get_part_size() const {
return parts_manager_.get_part_size();
}
void FileLoader::hangup() {
if (delay_dispatcher_.empty()) {
stop();
} else {
delay_dispatcher_.reset();
}
}
void FileLoader::hangup_shared() {
if (get_link_token() == 1) {
stop();
}
}
void FileLoader::update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) {
if (parts_manager_.get_streaming_offset() != offset) {
auto begin_part_id = parts_manager_.set_streaming_offset(offset, limit);
auto new_end_part_id = limit <= 0 ? parts_manager_.get_part_count()
: narrow_cast<int32>((offset + limit - 1) / parts_manager_.get_part_size()) + 1;
auto max_parts = narrow_cast<int32>(max_resource_limit / parts_manager_.get_part_size());
auto end_part_id = begin_part_id + td::min(max_parts, new_end_part_id - begin_part_id);
VLOG(file_loader) << "Protect parts " << begin_part_id << " ... " << end_part_id - 1;
for (auto &it : part_map_) {
if (!it.second.second.empty() && !(begin_part_id <= it.second.first.id && it.second.first.id < end_part_id)) {
VLOG(file_loader) << "Cancel part " << it.second.first.id;
it.second.second.reset(); // cancel_query(it.second.second);
}
}
} else {
parts_manager_.set_streaming_limit(limit);
}
update_estimated_limit();
loop();
}
void FileLoader::start_up() {
auto r_file_info = init();
if (r_file_info.is_error()) {
on_error(r_file_info.move_as_error());
stop_flag_ = true;
return;
}
auto file_info = r_file_info.ok();
auto size = file_info.size;
auto expected_size = max(size, file_info.expected_size);
bool is_size_final = file_info.is_size_final;
auto part_size = file_info.part_size;
auto &ready_parts = file_info.ready_parts;
auto use_part_count_limit = file_info.use_part_count_limit;
bool is_upload = file_info.is_upload;
// Two cases when FILE_UPLOAD_RESTART will happen
// 1. File is ready, size is final. But there are more uploaded parts than size of the file
// pm.init(1, 100000, true, 10, {0, 1, 2}, false, true).ensure_error();
// This error is definitely ok, because we are using actual size of the file on disk (mtime is checked by
// somebody else). And actual size could change arbitrarily.
//
// 2. File size is not final, and some parts ending after known file size were uploaded
// pm.init(0, 100000, false, 10, {0, 1, 2}, false, true).ensure_error();
// This can happen only if file state became inconsistent at some point. For example, local location was deleted,
// but partial remote location was kept. This is possible, but probably should be fixed.
auto status =
parts_manager_.init(size, expected_size, is_size_final, part_size, ready_parts, use_part_count_limit, is_upload);
LOG(DEBUG) << "Start " << (is_upload ? "up" : "down") << "loading a file of size " << size << " with expected "
<< (is_size_final ? "exact" : "approximate") << " size " << expected_size << ", part size " << part_size
<< " and " << ready_parts.size() << " ready parts: " << status;
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
return;
}
if (file_info.only_check) {
parts_manager_.set_checked_prefix_size(0);
}
parts_manager_.set_streaming_offset(file_info.offset, file_info.limit);
if (ordered_flag_) {
ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count());
}
if (file_info.need_delay) {
delay_dispatcher_ = create_actor<DelayDispatcher>("DelayDispatcher", 0.003, actor_shared(this, 1));
next_delay_ = 0.05;
}
resource_state_.set_unit_size(parts_manager_.get_part_size());
update_estimated_limit();
on_progress_impl();
yield();
}
void FileLoader::loop() {
if (stop_flag_) {
return;
}
auto status = do_loop();
if (status.is_error()) {
if (status.code() == -1) {
return;
}
on_error(std::move(status));
stop_flag_ = true;
return;
}
}
Status FileLoader::do_loop() {
TRY_RESULT(check_info,
check_loop(parts_manager_.get_checked_prefix_size(), parts_manager_.get_unchecked_ready_prefix_size(),
parts_manager_.unchecked_ready()));
if (check_info.changed) {
on_progress_impl();
}
for (auto &query : check_info.queries) {
G()->net_query_dispatcher().dispatch_with_callback(
std::move(query), actor_shared(this, UniqueId::next(UniqueId::Type::Default, COMMON_QUERY_KEY)));
}
if (check_info.need_check) {
parts_manager_.set_need_check();
parts_manager_.set_checked_prefix_size(check_info.checked_prefix_size);
}
if (parts_manager_.may_finish()) {
TRY_STATUS(parts_manager_.finish());
TRY_STATUS(on_ok(parts_manager_.get_size()));
LOG(INFO) << "Bad download order rate: "
<< (debug_total_parts_ == 0 ? 0.0 : 100.0 * debug_bad_part_order_ / debug_total_parts_) << "% "
<< debug_bad_part_order_ << "/" << debug_total_parts_ << " " << format::as_array(debug_bad_parts_);
stop_flag_ = true;
return Status::OK();
}
TRY_STATUS(before_start_parts());
SCOPE_EXIT {
after_start_parts();
};
while (true) {
if (resource_state_.unused() < narrow_cast<int64>(parts_manager_.get_part_size())) {
VLOG(file_loader) << "Receive only " << resource_state_.unused() << " resource";
break;
}
TRY_RESULT(part, parts_manager_.start_part());
if (part.size == 0) {
break;
}
VLOG(file_loader) << "Start part " << tag("id", part.id) << tag("size", part.size);
resource_state_.start_use(static_cast<int64>(part.size));
TRY_RESULT(query, start_part(part, parts_manager_.get_part_count(), parts_manager_.get_streaming_offset()));
uint64 unique_id = UniqueId::next();
part_map_[unique_id] = std::make_pair(part, query->cancel_slot_.get_signal_new());
// part_map_[unique_id] = std::make_pair(part, query.get_weak());
auto callback = actor_shared(this, unique_id);
if (delay_dispatcher_.empty()) {
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback));
} else {
query->debug("sent to DelayDispatcher");
send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback_and_delay, std::move(query),
std::move(callback), next_delay_);
next_delay_ = max(next_delay_ * 0.8, 0.003);
}
}
return Status::OK();
}
void FileLoader::tear_down() {
for (auto &it : part_map_) {
it.second.second.reset(); // cancel_query(it.second.second);
}
ordered_parts_.clear([](auto &&part) { part.second->clear(); });
if (!delay_dispatcher_.empty()) {
send_closure(std::move(delay_dispatcher_), &DelayDispatcher::close_silent);
}
}
void FileLoader::update_estimated_limit() {
if (stop_flag_) {
return;
}
auto estimated_extra = parts_manager_.get_estimated_extra();
resource_state_.update_estimated_limit(estimated_extra);
VLOG(file_loader) << "Update estimated limit " << estimated_extra;
if (!resource_manager_.empty()) {
keep_fd_flag(narrow_cast<uint64>(resource_state_.active_limit()) >= parts_manager_.get_part_size());
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
}
}
void FileLoader::on_result(NetQueryPtr query) {
if (stop_flag_) {
return;
}
auto unique_id = get_link_token();
if (UniqueId::extract_key(unique_id) == COMMON_QUERY_KEY) {
on_common_query(std::move(query));
return loop();
}
auto it = part_map_.find(unique_id);
if (it == part_map_.end()) {
LOG(WARNING) << "Receive result for unknown part";
return;
}
Part part = it->second.first;
it->second.second.release();
CHECK(query->is_ready());
part_map_.erase(it);
bool next = false;
auto status = [&] {
TRY_RESULT(should_restart, should_restart_part(part, query));
if (query->is_error() && query->error().code() == NetQuery::Error::Canceled) {
should_restart = true;
}
if (should_restart) {
VLOG(file_loader) << "Restart part " << tag("id", part.id) << tag("size", part.size);
resource_state_.stop_use(static_cast<int64>(part.size));
parts_manager_.on_part_failed(part.id);
} else {
next = true;
}
return Status::OK();
}();
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
return;
}
if (next) {
if (ordered_flag_) {
auto seq_no = part.id;
ordered_parts_.add(
seq_no, std::make_pair(part, std::move(query)),
[this](uint64 seq_no, std::pair<Part, NetQueryPtr> &&p) { on_part_query(p.first, std::move(p.second)); });
} else {
on_part_query(part, std::move(query));
}
}
update_estimated_limit();
loop();
}
void FileLoader::on_part_query(Part part, NetQueryPtr query) {
if (stop_flag_) {
// important for secret files
return;
}
auto status = try_on_part_query(part, std::move(query));
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
}
}
void FileLoader::on_common_query(NetQueryPtr query) {
auto status = process_check_query(std::move(query));
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
}
}
Status FileLoader::try_on_part_query(Part part, NetQueryPtr query) {
TRY_RESULT(size, process_part(part, std::move(query)));
VLOG(file_loader) << "Ok part " << tag("id", part.id) << tag("size", part.size);
resource_state_.stop_use(static_cast<int64>(part.size));
auto old_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count();
TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size));
auto new_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count();
debug_total_parts_++;
if (old_ready_prefix_count == new_ready_prefix_count) {
debug_bad_parts_.push_back(part.id);
debug_bad_part_order_++;
}
on_progress_impl();
return Status::OK();
}
void FileLoader::on_progress_impl() {
Progress progress;
progress.part_count = parts_manager_.get_part_count();
progress.part_size = static_cast<int32>(parts_manager_.get_part_size());
progress.ready_part_count = parts_manager_.get_ready_prefix_count();
progress.ready_bitmask = parts_manager_.get_bitmask();
progress.is_ready = parts_manager_.ready();
progress.ready_size = parts_manager_.get_ready_size();
progress.size = parts_manager_.get_size_or_zero();
on_progress(std::move(progress));
}
} // namespace td

View File

@ -1,128 +0,0 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/telegram/DelayDispatcher.h"
#include "td/telegram/files/FileLoaderActor.h"
#include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/PartsManager.h"
#include "td/telegram/files/ResourceManager.h"
#include "td/telegram/files/ResourceState.h"
#include "td/telegram/net/NetQuery.h"
#include "td/actor/actor.h"
#include "td/utils/common.h"
#include "td/utils/OrderedEventsProcessor.h"
#include "td/utils/Status.h"
#include <map>
#include <utility>
namespace td {
class FileLoader : public FileLoaderActor {
public:
void set_resource_manager(ActorShared<ResourceManager> resource_manager) final;
void update_priority(int8 priority) final;
void update_resources(const ResourceState &other) final;
void update_local_file_location(const LocalFileLocation &local) final {
}
void update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) final;
protected:
void set_ordered_flag(bool flag);
size_t get_part_size() const;
struct FileInfo {
int64 size{0};
int64 expected_size{0};
bool is_size_final{false};
int32 part_size{0};
std::vector<int> ready_parts;
bool use_part_count_limit{true};
bool only_check{false};
bool need_delay{false};
int64 offset{0};
int64 limit{0};
bool is_upload{false};
};
virtual Result<FileInfo> init() TD_WARN_UNUSED_RESULT = 0;
virtual Status on_ok(int64 size) TD_WARN_UNUSED_RESULT = 0;
virtual void on_error(Status status) = 0;
virtual Status before_start_parts() {
return Status::OK();
}
virtual Result<NetQueryPtr> start_part(Part part, int part_count, int64 streaming_offset) TD_WARN_UNUSED_RESULT = 0;
virtual void after_start_parts() {
}
virtual Result<size_t> process_part(Part part, NetQueryPtr net_query) TD_WARN_UNUSED_RESULT = 0;
struct Progress {
int32 part_count{0};
int32 part_size{0};
int32 ready_part_count{0};
string ready_bitmask;
bool is_ready{false};
int64 ready_size{0};
int64 size{0};
};
virtual void on_progress(Progress progress) = 0;
virtual Result<bool> should_restart_part(Part part, const NetQueryPtr &net_query) TD_WARN_UNUSED_RESULT {
return false;
}
virtual Status process_check_query(NetQueryPtr net_query) {
return Status::Error("Unsupported");
}
struct CheckInfo {
bool need_check{false};
bool changed{false};
int64 checked_prefix_size{0};
std::vector<NetQueryPtr> queries;
};
virtual Result<CheckInfo> check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) {
return CheckInfo{};
}
virtual void keep_fd_flag(bool keep_fd) {
}
private:
static constexpr uint8 COMMON_QUERY_KEY = 2;
bool stop_flag_ = false;
ActorShared<ResourceManager> resource_manager_;
ResourceState resource_state_;
PartsManager parts_manager_;
std::map<uint64, std::pair<Part, ActorShared<>>> part_map_;
bool ordered_flag_ = false;
OrderedEventsProcessor<std::pair<Part, NetQueryPtr>> ordered_parts_;
ActorOwn<DelayDispatcher> delay_dispatcher_;
double next_delay_ = 0;
uint32 debug_total_parts_ = 0;
uint32 debug_bad_part_order_ = 0;
std::vector<int32> debug_bad_parts_;
void start_up() final;
void loop() final;
Status do_loop();
void hangup() final;
void hangup_shared() final;
void tear_down() final;
void update_estimated_limit();
void on_progress_impl();
void on_result(NetQueryPtr query) final;
void on_part_query(Part part, NetQueryPtr query);
void on_common_query(NetQueryPtr query);
Status try_on_part_query(Part part, NetQueryPtr query);
};
} // namespace td