742 lines
27 KiB
C++
742 lines
27 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#include "td/telegram/files/FileDownloader.h"
|
|
|
|
#include "td/telegram/FileReferenceManager.h"
|
|
#include "td/telegram/files/FileLoaderUtils.h"
|
|
#include "td/telegram/files/FileType.h"
|
|
#include "td/telegram/Global.h"
|
|
#include "td/telegram/net/NetQueryDispatcher.h"
|
|
#include "td/telegram/SecureStorage.h"
|
|
#include "td/telegram/telegram_api.h"
|
|
#include "td/telegram/UniqueId.h"
|
|
|
|
#include "td/utils/as.h"
|
|
#include "td/utils/base64.h"
|
|
#include "td/utils/buffer.h"
|
|
#include "td/utils/crypto.h"
|
|
#include "td/utils/format.h"
|
|
#include "td/utils/logging.h"
|
|
#include "td/utils/misc.h"
|
|
#include "td/utils/port/path.h"
|
|
#include "td/utils/port/Stat.h"
|
|
#include "td/utils/ScopeGuard.h"
|
|
#include "td/utils/SliceBuilder.h"
|
|
#include "td/utils/UInt.h"
|
|
|
|
#include <tuple>
|
|
|
|
namespace td {
|
|
|
|
FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const LocalFileLocation &local, int64 size,
|
|
string name, const FileEncryptionKey &encryption_key, bool is_small,
|
|
bool need_search_file, int64 offset, int64 limit, unique_ptr<Callback> callback)
|
|
: remote_(remote)
|
|
, local_(local)
|
|
, size_(size)
|
|
, name_(std::move(name))
|
|
, encryption_key_(encryption_key)
|
|
, callback_(std::move(callback))
|
|
, is_small_(is_small)
|
|
, need_search_file_(need_search_file)
|
|
, ordered_flag_(encryption_key_.is_secret())
|
|
, offset_(offset)
|
|
, limit_(limit) {
|
|
if (!encryption_key.empty()) {
|
|
CHECK(offset_ == 0);
|
|
}
|
|
}
|
|
|
|
void FileDownloader::on_error(Status status) {
|
|
fd_.close();
|
|
stop_flag_ = true;
|
|
callback_->on_error(std::move(status));
|
|
}
|
|
|
|
Result<bool> FileDownloader::should_restart_part(Part part, const NetQueryPtr &net_query) {
|
|
// Check if we should use CDN or reupload file to CDN
|
|
|
|
if (net_query->is_error()) {
|
|
if (net_query->error().message() == "FILE_TOKEN_INVALID") {
|
|
use_cdn_ = false;
|
|
return true;
|
|
}
|
|
if (net_query->error().message() == "REQUEST_TOKEN_INVALID") {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
switch (narrow_cast<QueryType>(UniqueId::extract_key(net_query->id()))) {
|
|
case QueryType::Default: {
|
|
if (net_query->ok_tl_constructor() == telegram_api::upload_fileCdnRedirect::ID) {
|
|
TRY_RESULT(file_base, fetch_result<telegram_api::upload_getFile>(net_query->ok()));
|
|
CHECK(file_base->get_id() == telegram_api::upload_fileCdnRedirect::ID);
|
|
auto file = move_tl_object_as<telegram_api::upload_fileCdnRedirect>(file_base);
|
|
LOG(DEBUG) << "Downloading of part " << part.id << " was redirected to " << oneline(to_string(file));
|
|
|
|
auto new_cdn_file_token = file->file_token_.as_slice();
|
|
if (cdn_file_token_ == new_cdn_file_token) {
|
|
return true;
|
|
}
|
|
|
|
use_cdn_ = true;
|
|
need_check_ = true;
|
|
cdn_file_token_generation_++;
|
|
cdn_file_token_ = new_cdn_file_token.str();
|
|
cdn_dc_id_ = DcId::external(file->dc_id_);
|
|
cdn_encryption_key_ = file->encryption_key_.as_slice().str();
|
|
cdn_encryption_iv_ = file->encryption_iv_.as_slice().str();
|
|
add_hash_info(file->file_hashes_);
|
|
if (cdn_encryption_iv_.size() != 16 || cdn_encryption_key_.size() != 32) {
|
|
return Status::Error("Wrong ctr key or iv size");
|
|
}
|
|
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
case QueryType::ReuploadCDN: {
|
|
TRY_RESULT(file_hashes, fetch_result<telegram_api::upload_reuploadCdnFile>(net_query->ok()));
|
|
add_hash_info(file_hashes);
|
|
LOG(DEBUG) << "Part " << part.id << " was reuplaoded to CDN";
|
|
return true;
|
|
}
|
|
case QueryType::CDN: {
|
|
if (net_query->ok_tl_constructor() == telegram_api::upload_cdnFileReuploadNeeded::ID) {
|
|
TRY_RESULT(file_base, fetch_result<telegram_api::upload_getCdnFile>(net_query->ok()));
|
|
CHECK(file_base->get_id() == telegram_api::upload_cdnFileReuploadNeeded::ID);
|
|
auto file = move_tl_object_as<telegram_api::upload_cdnFileReuploadNeeded>(file_base);
|
|
LOG(DEBUG) << "Part " << part.id << " must be reuplaoded to " << oneline(to_string(file));
|
|
cdn_part_reupload_token_[part.id] = file->request_token_.as_slice().str();
|
|
return true;
|
|
}
|
|
auto it = cdn_part_file_token_generation_.find(part.id);
|
|
CHECK(it != cdn_part_file_token_generation_.end());
|
|
if (it->second != cdn_file_token_generation_) {
|
|
LOG(DEBUG) << "Receive part " << part.id << " with an old file_token";
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
default:
|
|
UNREACHABLE();
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
Result<NetQueryPtr> FileDownloader::start_part(Part part, int32 part_count, int64 streaming_offset) {
|
|
if (encryption_key_.is_secret()) {
|
|
part.size = (part.size + 15) & ~15; // fix for last part
|
|
}
|
|
// auto size = part.size;
|
|
//// sometimes we can ask more than server has, just to check size
|
|
// if (size < parts_manager_.get_part_size()) {
|
|
// size = min(size + 16, parts_manager_.get_part_size());
|
|
// LOG(INFO) << "Ask " << size << " instead of " << part.size;
|
|
//}
|
|
auto size = parts_manager_.get_part_size();
|
|
CHECK(part.size <= size);
|
|
|
|
callback_->on_start_download();
|
|
|
|
auto net_query_type = is_small_ ? NetQuery::Type::DownloadSmall : NetQuery::Type::Download;
|
|
NetQueryPtr net_query;
|
|
if (!use_cdn_) {
|
|
int32 flags = 0;
|
|
#if !TD_EMSCRIPTEN
|
|
// CDN is supported, unless we use domains instead of IPs from a browser
|
|
if (streaming_offset == 0) {
|
|
flags |= telegram_api::upload_getFile::CDN_SUPPORTED_MASK;
|
|
}
|
|
#endif
|
|
DcId dc_id = remote_.is_web() ? G()->get_webfile_dc_id() : remote_.get_dc_id();
|
|
auto unique_id = UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Default));
|
|
net_query =
|
|
remote_.is_web()
|
|
? G()->net_query_creator().create(
|
|
unique_id, nullptr,
|
|
telegram_api::upload_getWebFile(remote_.as_input_web_file_location(), narrow_cast<int32>(part.offset),
|
|
narrow_cast<int32>(size)),
|
|
{}, dc_id, net_query_type, NetQuery::AuthFlag::On)
|
|
: G()->net_query_creator().create(
|
|
unique_id, nullptr,
|
|
telegram_api::upload_getFile(flags, false /*ignored*/, false /*ignored*/,
|
|
remote_.as_input_file_location(), part.offset, narrow_cast<int32>(size)),
|
|
{}, dc_id, net_query_type, NetQuery::AuthFlag::On);
|
|
} else {
|
|
if (remote_.is_web()) {
|
|
return Status::Error("Can't download web file from CDN");
|
|
}
|
|
auto it = cdn_part_reupload_token_.find(part.id);
|
|
if (it == cdn_part_reupload_token_.end()) {
|
|
auto query = telegram_api::upload_getCdnFile(BufferSlice(cdn_file_token_), part.offset, narrow_cast<int32>(size));
|
|
cdn_part_file_token_generation_[part.id] = cdn_file_token_generation_;
|
|
net_query =
|
|
G()->net_query_creator().create(UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::CDN)),
|
|
nullptr, query, {}, cdn_dc_id_, net_query_type, NetQuery::AuthFlag::Off);
|
|
} else {
|
|
auto query = telegram_api::upload_reuploadCdnFile(BufferSlice(cdn_file_token_), BufferSlice(it->second));
|
|
net_query = G()->net_query_creator().create(
|
|
UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::ReuploadCDN)), nullptr, query, {},
|
|
remote_.get_dc_id(), net_query_type, NetQuery::AuthFlag::On);
|
|
cdn_part_reupload_token_.erase(it);
|
|
}
|
|
}
|
|
net_query->file_type_ = narrow_cast<int32>(remote_.file_type_);
|
|
return std::move(net_query);
|
|
}
|
|
|
|
Status FileDownloader::check_net_query(NetQueryPtr &net_query) {
|
|
if (net_query->is_error()) {
|
|
auto error = net_query->move_as_error();
|
|
if (FileReferenceManager::is_file_reference_error(error)) {
|
|
VLOG(file_references) << "Receive " << error << " for being downloaded file";
|
|
error = Status::Error(error.code(),
|
|
PSLICE() << error.message() << "#BASE64" << base64_encode(remote_.get_file_reference()));
|
|
}
|
|
return error;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) {
|
|
TRY_STATUS(check_net_query(net_query));
|
|
|
|
BufferSlice bytes;
|
|
bool need_cdn_decrypt = false;
|
|
auto query_type = narrow_cast<QueryType>(UniqueId::extract_key(net_query->id()));
|
|
switch (query_type) {
|
|
case QueryType::Default: {
|
|
if (remote_.is_web()) {
|
|
TRY_RESULT(file, fetch_result<telegram_api::upload_getWebFile>(std::move(net_query)));
|
|
bytes = std::move(file->bytes_);
|
|
} else {
|
|
TRY_RESULT(file_base, fetch_result<telegram_api::upload_getFile>(std::move(net_query)));
|
|
CHECK(file_base->get_id() == telegram_api::upload_file::ID);
|
|
auto file = move_tl_object_as<telegram_api::upload_file>(file_base);
|
|
LOG(DEBUG) << "Receive part " << part.id << ": " << to_string(file);
|
|
bytes = std::move(file->bytes_);
|
|
}
|
|
break;
|
|
}
|
|
case QueryType::CDN: {
|
|
TRY_RESULT(file_base, fetch_result<telegram_api::upload_getCdnFile>(std::move(net_query)));
|
|
CHECK(file_base->get_id() == telegram_api::upload_cdnFile::ID);
|
|
auto file = move_tl_object_as<telegram_api::upload_cdnFile>(file_base);
|
|
LOG(DEBUG) << "Receive part " << part.id << " from CDN: " << to_string(file);
|
|
bytes = std::move(file->bytes_);
|
|
need_cdn_decrypt = true;
|
|
break;
|
|
}
|
|
default:
|
|
UNREACHABLE();
|
|
}
|
|
|
|
auto padded_size = part.size;
|
|
if (encryption_key_.is_secret()) {
|
|
padded_size = (part.size + 15) & ~15;
|
|
}
|
|
if (bytes.size() > padded_size) {
|
|
return Status::Error("Part size is more than requested");
|
|
}
|
|
if (bytes.empty()) {
|
|
return 0;
|
|
}
|
|
|
|
// Encryption
|
|
if (need_cdn_decrypt) {
|
|
CHECK(part.offset % 16 == 0);
|
|
auto offset = narrow_cast<uint32>(part.offset / 16);
|
|
offset =
|
|
((offset & 0xff) << 24) | ((offset & 0xff00) << 8) | ((offset & 0xff0000) >> 8) | ((offset & 0xff000000) >> 24);
|
|
|
|
AesCtrState ctr_state;
|
|
string iv = cdn_encryption_iv_;
|
|
as<uint32>(&iv[12]) = offset;
|
|
ctr_state.init(cdn_encryption_key_, iv);
|
|
ctr_state.decrypt(bytes.as_slice(), bytes.as_mutable_slice());
|
|
}
|
|
if (encryption_key_.is_secret()) {
|
|
LOG_CHECK(next_part_ == part.id) << tag("expected part.id", next_part_) << "!=" << tag("part.id", part.id);
|
|
CHECK(!next_part_stop_);
|
|
next_part_++;
|
|
if (part.size % 16 != 0) {
|
|
next_part_stop_ = true;
|
|
}
|
|
aes_ige_decrypt(as_slice(encryption_key_.key()), as_mutable_slice(encryption_key_.mutable_iv()), bytes.as_slice(),
|
|
bytes.as_mutable_slice());
|
|
}
|
|
|
|
auto slice = bytes.as_slice().substr(0, part.size);
|
|
TRY_STATUS(acquire_fd());
|
|
LOG(INFO) << "Receive " << slice.size() << " bytes at offset " << part.offset << " for \"" << path_ << '"';
|
|
TRY_RESULT(written, fd_.pwrite(slice, part.offset));
|
|
LOG(INFO) << "Written " << written << " bytes";
|
|
// may write less than part.size, when size of downloadable file is unknown
|
|
if (written != slice.size()) {
|
|
return Status::Error("Failed to save file part to the file");
|
|
}
|
|
return written;
|
|
}
|
|
|
|
void FileDownloader::on_progress() {
|
|
if (parts_manager_.ready()) {
|
|
// do not send partial location. will lead to wrong local_size
|
|
return;
|
|
}
|
|
auto ready_size = parts_manager_.get_ready_size();
|
|
if (ready_size == 0 || path_.empty()) {
|
|
return;
|
|
}
|
|
auto part_size = static_cast<int32>(parts_manager_.get_part_size());
|
|
auto size = parts_manager_.get_size_or_zero();
|
|
if (encryption_key_.empty() || encryption_key_.is_secure()) {
|
|
callback_->on_partial_download(
|
|
PartialLocalFileLocation{remote_.file_type_, part_size, path_, "", parts_manager_.get_bitmask()}, ready_size,
|
|
size);
|
|
} else if (encryption_key_.is_secret()) {
|
|
UInt256 iv;
|
|
auto ready_part_count = parts_manager_.get_ready_prefix_count();
|
|
if (ready_part_count == next_part_) {
|
|
iv = encryption_key_.mutable_iv();
|
|
} else {
|
|
LOG(FATAL) << tag("ready_part_count", ready_part_count) << tag("next_part", next_part_);
|
|
}
|
|
callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, part_size, path_, as_slice(iv).str(),
|
|
parts_manager_.get_bitmask()},
|
|
ready_size, size);
|
|
} else {
|
|
UNREACHABLE();
|
|
}
|
|
}
|
|
|
|
Status FileDownloader::process_check_query(NetQueryPtr net_query) {
|
|
has_hash_query_ = false;
|
|
TRY_STATUS(check_net_query(net_query));
|
|
TRY_RESULT(file_hashes, fetch_result<telegram_api::upload_getFileHashes>(std::move(net_query)));
|
|
add_hash_info(file_hashes);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status FileDownloader::check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) {
|
|
if (!need_check_) {
|
|
return Status::OK();
|
|
}
|
|
SCOPE_EXIT {
|
|
try_release_fd();
|
|
};
|
|
bool is_changed = false;
|
|
vector<NetQueryPtr> queries;
|
|
while (checked_prefix_size < ready_prefix_size) {
|
|
//LOG(ERROR) << "NEED TO CHECK: " << checked_prefix_size << "->" << ready_prefix_size - checked_prefix_size;
|
|
HashInfo search_info;
|
|
search_info.offset = checked_prefix_size;
|
|
auto it = hash_info_.upper_bound(search_info);
|
|
if (it != hash_info_.begin()) {
|
|
--it;
|
|
}
|
|
if (it != hash_info_.end() && it->offset <= checked_prefix_size &&
|
|
it->offset + narrow_cast<int64>(it->size) > checked_prefix_size) {
|
|
int64 begin_offset = it->offset;
|
|
int64 end_offset = it->offset + narrow_cast<int64>(it->size);
|
|
if (ready_prefix_size < end_offset) {
|
|
if (!is_ready) {
|
|
break;
|
|
}
|
|
end_offset = ready_prefix_size;
|
|
}
|
|
auto size = narrow_cast<size_t>(end_offset - begin_offset);
|
|
auto slice = BufferSlice(size);
|
|
TRY_STATUS(acquire_fd());
|
|
TRY_RESULT(read_size, fd_.pread(slice.as_mutable_slice(), begin_offset));
|
|
if (size != read_size) {
|
|
return Status::Error("Failed to read file to check hash");
|
|
}
|
|
string hash(32, ' ');
|
|
sha256(slice.as_slice(), hash);
|
|
|
|
if (hash != it->hash) {
|
|
if (only_check_) {
|
|
return Status::Error("FILE_DOWNLOAD_RESTART");
|
|
}
|
|
return Status::Error("Hash mismatch");
|
|
}
|
|
|
|
checked_prefix_size = end_offset;
|
|
is_changed = true;
|
|
continue;
|
|
}
|
|
if (!has_hash_query_) {
|
|
has_hash_query_ = true;
|
|
auto query = telegram_api::upload_getFileHashes(remote_.as_input_file_location(), checked_prefix_size);
|
|
auto net_query_type = is_small_ ? NetQuery::Type::DownloadSmall : NetQuery::Type::Download;
|
|
auto net_query = G()->net_query_creator().create(query, {}, remote_.get_dc_id(), net_query_type);
|
|
queries.push_back(std::move(net_query));
|
|
break;
|
|
}
|
|
// Should fail?
|
|
break;
|
|
}
|
|
|
|
if (is_changed) {
|
|
on_progress();
|
|
}
|
|
for (auto &query : queries) {
|
|
G()->net_query_dispatcher().dispatch_with_callback(
|
|
std::move(query), actor_shared(this, UniqueId::next(UniqueId::Type::Default, COMMON_QUERY_KEY)));
|
|
}
|
|
if (need_check_) {
|
|
parts_manager_.set_need_check();
|
|
parts_manager_.set_checked_prefix_size(checked_prefix_size);
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
void FileDownloader::add_hash_info(const std::vector<telegram_api::object_ptr<telegram_api::fileHash>> &hashes) {
|
|
for (auto &hash : hashes) {
|
|
//LOG(ERROR) << "ADD HASH " << hash->offset_ << "->" << hash->limit_;
|
|
HashInfo hash_info;
|
|
hash_info.size = hash->limit_;
|
|
hash_info.offset = hash->offset_;
|
|
hash_info.hash = hash->hash_.as_slice().str();
|
|
hash_info_.insert(std::move(hash_info));
|
|
}
|
|
}
|
|
|
|
void FileDownloader::try_release_fd() {
|
|
if (!keep_fd_ && !fd_.empty()) {
|
|
fd_.close();
|
|
}
|
|
}
|
|
|
|
Status FileDownloader::acquire_fd() {
|
|
if (fd_.empty()) {
|
|
if (path_.empty()) {
|
|
TRY_RESULT_ASSIGN(std::tie(fd_, path_), open_temp_file(remote_.file_type_));
|
|
} else {
|
|
TRY_RESULT_ASSIGN(fd_, FileFd::open(path_, (only_check_ ? 0 : FileFd::Write) | FileFd::Read));
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void FileDownloader::set_resource_manager(ActorShared<ResourceManager> resource_manager) {
|
|
resource_manager_ = std::move(resource_manager);
|
|
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
|
|
}
|
|
|
|
void FileDownloader::update_priority(int8 priority) {
|
|
send_closure(resource_manager_, &ResourceManager::update_priority, priority);
|
|
}
|
|
|
|
void FileDownloader::update_resources(const ResourceState &other) {
|
|
resource_state_.update_slave(other);
|
|
VLOG(file_loader) << "Update resources " << resource_state_;
|
|
loop();
|
|
}
|
|
|
|
void FileDownloader::hangup() {
|
|
if (delay_dispatcher_.empty()) {
|
|
stop();
|
|
} else {
|
|
delay_dispatcher_.reset();
|
|
}
|
|
}
|
|
|
|
void FileDownloader::hangup_shared() {
|
|
if (get_link_token() == 1) {
|
|
stop();
|
|
}
|
|
}
|
|
|
|
void FileDownloader::update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) {
|
|
if (parts_manager_.get_streaming_offset() != offset) {
|
|
auto begin_part_id = parts_manager_.set_streaming_offset(offset, limit);
|
|
auto new_end_part_id = limit <= 0 ? parts_manager_.get_part_count()
|
|
: narrow_cast<int32>((offset + limit - 1) / parts_manager_.get_part_size()) + 1;
|
|
auto max_parts = narrow_cast<int32>(max_resource_limit / parts_manager_.get_part_size());
|
|
auto end_part_id = begin_part_id + td::min(max_parts, new_end_part_id - begin_part_id);
|
|
VLOG(file_loader) << "Protect parts " << begin_part_id << " ... " << end_part_id - 1;
|
|
for (auto &it : part_map_) {
|
|
if (!it.second.second.empty() && !(begin_part_id <= it.second.first.id && it.second.first.id < end_part_id)) {
|
|
VLOG(file_loader) << "Cancel part " << it.second.first.id;
|
|
it.second.second.reset(); // cancel_query(it.second.second);
|
|
}
|
|
}
|
|
} else {
|
|
parts_manager_.set_streaming_limit(limit);
|
|
}
|
|
update_estimated_limit();
|
|
loop();
|
|
}
|
|
|
|
void FileDownloader::start_up() {
|
|
if (local_.type() == LocalFileLocation::Type::Full) {
|
|
return on_error(Status::Error("File is already downloaded"));
|
|
}
|
|
if (encryption_key_.is_secure() && !encryption_key_.has_value_hash()) {
|
|
LOG(ERROR) << "Can't download Secure file with unknown value_hash";
|
|
}
|
|
if (remote_.file_type_ == FileType::SecureEncrypted) {
|
|
size_ = 0;
|
|
}
|
|
int32 part_size = 0;
|
|
Bitmask bitmask{Bitmask::Ones{}, 0};
|
|
if (local_.type() == LocalFileLocation::Type::Partial) {
|
|
const auto &partial = local_.partial();
|
|
path_ = partial.path_;
|
|
auto result_fd = FileFd::open(path_, FileFd::Write | FileFd::Read);
|
|
// TODO: check timestamps..
|
|
if (result_fd.is_ok()) {
|
|
if ((!encryption_key_.is_secret() || partial.iv_.size() == 32) && partial.part_size_ >= 0 &&
|
|
partial.part_size_ <= (1 << 20) && (partial.part_size_ & (partial.part_size_ - 1)) == 0) {
|
|
bitmask = Bitmask(Bitmask::Decode{}, partial.ready_bitmask_);
|
|
if (encryption_key_.is_secret()) {
|
|
encryption_key_.mutable_iv() = as<UInt256>(partial.iv_.data());
|
|
next_part_ = narrow_cast<int32>(bitmask.get_ready_parts(0));
|
|
}
|
|
fd_ = result_fd.move_as_ok();
|
|
part_size = static_cast<int32>(partial.part_size_);
|
|
} else {
|
|
LOG(ERROR) << "Have invalid " << partial;
|
|
}
|
|
}
|
|
}
|
|
if (need_search_file_ && fd_.empty() && size_ > 0 && encryption_key_.empty() && !remote_.is_web()) {
|
|
auto r_path = search_file(remote_.file_type_, name_, size_);
|
|
if (r_path.is_ok()) {
|
|
auto r_fd = FileFd::open(r_path.ok(), FileFd::Read);
|
|
if (r_fd.is_ok()) {
|
|
path_ = r_path.move_as_ok();
|
|
fd_ = r_fd.move_as_ok();
|
|
need_check_ = true;
|
|
only_check_ = true;
|
|
part_size = 128 * (1 << 10);
|
|
bitmask = Bitmask{Bitmask::Ones{}, (size_ + part_size - 1) / part_size};
|
|
LOG(INFO) << "Check hash of local file " << path_;
|
|
}
|
|
}
|
|
}
|
|
try_release_fd();
|
|
|
|
auto ready_parts = bitmask.as_vector();
|
|
auto status = parts_manager_.init(size_, size_, true, part_size, ready_parts, false, false);
|
|
LOG(DEBUG) << "Start downloading a file of size " << size_ << ", part size " << part_size << " and "
|
|
<< ready_parts.size() << " ready parts: " << status;
|
|
if (status.is_error()) {
|
|
return on_error(std::move(status));
|
|
}
|
|
if (only_check_) {
|
|
parts_manager_.set_checked_prefix_size(0);
|
|
}
|
|
parts_manager_.set_streaming_offset(offset_, limit_);
|
|
if (ordered_flag_) {
|
|
ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count());
|
|
}
|
|
auto file_type = get_main_file_type(remote_.file_type_);
|
|
if (!is_small_ &&
|
|
(file_type == FileType::VideoNote || file_type == FileType::Document || file_type == FileType::VoiceNote ||
|
|
file_type == FileType::Audio || file_type == FileType::Video || file_type == FileType::Animation ||
|
|
file_type == FileType::VideoStory || (file_type == FileType::Encrypted && size_ > (1 << 20)))) {
|
|
delay_dispatcher_ = create_actor<DelayDispatcher>("DelayDispatcher", 0.003, actor_shared(this, 1));
|
|
next_delay_ = 0.05;
|
|
}
|
|
resource_state_.set_unit_size(parts_manager_.get_part_size());
|
|
update_estimated_limit();
|
|
on_progress();
|
|
yield();
|
|
}
|
|
|
|
void FileDownloader::loop() {
|
|
if (stop_flag_) {
|
|
return;
|
|
}
|
|
auto status = do_loop();
|
|
if (status.is_error()) {
|
|
if (status.code() == -1) {
|
|
return;
|
|
}
|
|
return on_error(std::move(status));
|
|
}
|
|
}
|
|
|
|
Status FileDownloader::do_loop() {
|
|
TRY_STATUS(check_loop(parts_manager_.get_checked_prefix_size(), parts_manager_.get_unchecked_ready_prefix_size(),
|
|
parts_manager_.unchecked_ready()));
|
|
|
|
if (parts_manager_.may_finish()) {
|
|
TRY_STATUS(parts_manager_.finish());
|
|
fd_.close();
|
|
auto size = parts_manager_.get_size();
|
|
if (encryption_key_.is_secure()) {
|
|
TRY_RESULT(file_path, open_temp_file(remote_.file_type_));
|
|
string tmp_path;
|
|
std::tie(std::ignore, tmp_path) = std::move(file_path);
|
|
TRY_STATUS(secure_storage::decrypt_file(encryption_key_.secret(), encryption_key_.value_hash(), path_, tmp_path));
|
|
unlink(path_).ignore();
|
|
path_ = std::move(tmp_path);
|
|
TRY_RESULT(path_stat, stat(path_));
|
|
size = path_stat.size_;
|
|
}
|
|
string path;
|
|
if (only_check_) {
|
|
path = path_;
|
|
} else {
|
|
TRY_RESULT_ASSIGN(path, create_from_temp(remote_.file_type_, path_, name_));
|
|
}
|
|
callback_->on_ok(FullLocalFileLocation(remote_.file_type_, std::move(path), 0), size, !only_check_);
|
|
|
|
LOG(INFO) << "Bad download order rate: "
|
|
<< (debug_total_parts_ == 0 ? 0.0 : 100.0 * debug_bad_part_order_ / debug_total_parts_) << "% "
|
|
<< debug_bad_part_order_ << "/" << debug_total_parts_ << " " << format::as_array(debug_bad_parts_);
|
|
stop_flag_ = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
while (true) {
|
|
if (resource_state_.unused() < narrow_cast<int64>(parts_manager_.get_part_size())) {
|
|
VLOG(file_loader) << "Receive only " << resource_state_.unused() << " resource";
|
|
break;
|
|
}
|
|
TRY_RESULT(part, parts_manager_.start_part());
|
|
if (part.size == 0) {
|
|
break;
|
|
}
|
|
VLOG(file_loader) << "Start part " << tag("id", part.id) << tag("size", part.size);
|
|
resource_state_.start_use(static_cast<int64>(part.size));
|
|
|
|
TRY_RESULT(query, start_part(part, parts_manager_.get_part_count(), parts_manager_.get_streaming_offset()));
|
|
uint64 unique_id = UniqueId::next();
|
|
part_map_[unique_id] = std::make_pair(part, query->cancel_slot_.get_signal_new());
|
|
|
|
auto callback = actor_shared(this, unique_id);
|
|
if (delay_dispatcher_.empty()) {
|
|
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback));
|
|
} else {
|
|
query->debug("sent to DelayDispatcher");
|
|
send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback_and_delay, std::move(query),
|
|
std::move(callback), next_delay_);
|
|
next_delay_ = max(next_delay_ * 0.8, 0.003);
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void FileDownloader::tear_down() {
|
|
for (auto &it : part_map_) {
|
|
it.second.second.reset(); // cancel_query(it.second.second);
|
|
}
|
|
ordered_parts_.clear([](auto &&part) { part.second->clear(); });
|
|
if (!delay_dispatcher_.empty()) {
|
|
send_closure(std::move(delay_dispatcher_), &DelayDispatcher::close_silent);
|
|
}
|
|
}
|
|
|
|
void FileDownloader::update_estimated_limit() {
|
|
if (stop_flag_) {
|
|
return;
|
|
}
|
|
auto estimated_extra = parts_manager_.get_estimated_extra();
|
|
resource_state_.update_estimated_limit(estimated_extra);
|
|
VLOG(file_loader) << "Update estimated limit " << estimated_extra;
|
|
if (!resource_manager_.empty()) {
|
|
keep_fd_ = narrow_cast<uint64>(resource_state_.active_limit()) >= parts_manager_.get_part_size();
|
|
try_release_fd();
|
|
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
|
|
}
|
|
}
|
|
|
|
void FileDownloader::on_result(NetQueryPtr query) {
|
|
if (stop_flag_) {
|
|
return;
|
|
}
|
|
auto unique_id = get_link_token();
|
|
if (UniqueId::extract_key(unique_id) == COMMON_QUERY_KEY) {
|
|
auto status = process_check_query(std::move(query));
|
|
if (status.is_error()) {
|
|
on_error(std::move(status));
|
|
} else {
|
|
loop();
|
|
}
|
|
return;
|
|
}
|
|
auto it = part_map_.find(unique_id);
|
|
if (it == part_map_.end()) {
|
|
LOG(WARNING) << "Receive result for unknown part";
|
|
return;
|
|
}
|
|
|
|
Part part = it->second.first;
|
|
it->second.second.release();
|
|
CHECK(query->is_ready());
|
|
part_map_.erase(it);
|
|
|
|
bool next = false;
|
|
auto status = [&] {
|
|
TRY_RESULT(should_restart, should_restart_part(part, query));
|
|
if (query->is_error() && query->error().code() == NetQuery::Error::Canceled) {
|
|
should_restart = true;
|
|
}
|
|
if (should_restart) {
|
|
VLOG(file_loader) << "Restart part " << tag("id", part.id) << tag("size", part.size);
|
|
resource_state_.stop_use(static_cast<int64>(part.size));
|
|
parts_manager_.on_part_failed(part.id);
|
|
} else {
|
|
next = true;
|
|
}
|
|
return Status::OK();
|
|
}();
|
|
if (status.is_error()) {
|
|
return on_error(std::move(status));
|
|
}
|
|
|
|
if (next) {
|
|
if (ordered_flag_) {
|
|
auto seq_no = part.id;
|
|
ordered_parts_.add(
|
|
seq_no, std::make_pair(part, std::move(query)),
|
|
[this](uint64 seq_no, std::pair<Part, NetQueryPtr> &&p) { on_part_query(p.first, std::move(p.second)); });
|
|
} else {
|
|
on_part_query(part, std::move(query));
|
|
}
|
|
}
|
|
update_estimated_limit();
|
|
loop();
|
|
}
|
|
|
|
void FileDownloader::on_part_query(Part part, NetQueryPtr query) {
|
|
if (stop_flag_) {
|
|
// important for secret files
|
|
return;
|
|
}
|
|
auto status = try_on_part_query(part, std::move(query));
|
|
if (status.is_error()) {
|
|
on_error(std::move(status));
|
|
}
|
|
}
|
|
|
|
Status FileDownloader::try_on_part_query(Part part, NetQueryPtr query) {
|
|
TRY_RESULT(size, process_part(part, std::move(query)));
|
|
VLOG(file_loader) << "Ok part " << tag("id", part.id) << tag("size", part.size);
|
|
resource_state_.stop_use(static_cast<int64>(part.size));
|
|
auto old_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count();
|
|
TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size));
|
|
auto new_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count();
|
|
debug_total_parts_++;
|
|
if (old_ready_prefix_count == new_ready_prefix_count) {
|
|
debug_bad_parts_.push_back(part.id);
|
|
debug_bad_part_order_++;
|
|
}
|
|
on_progress();
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace td
|