downloadFile: add limit:int32

GitOrigin-RevId: 395d29e8383db5c54a3f85a555c4e9648546ef47
This commit is contained in:
Arseny Smirnov 2019-02-18 22:08:05 +03:00
parent 8b233484e0
commit 206fbc0686
18 changed files with 133 additions and 25 deletions

View File

@ -3068,7 +3068,8 @@ setPinnedChats chat_ids:vector<int53> = Ok;
//@file_id Identifier of the file to download
//@priority Priority of the download (1-32). The higher the priority, the earlier the file will be downloaded. If the priorities of two files are equal, then the last one for which downloadFile was called will be downloaded first
//@offset File will be downloaded starting from that offset in bytes first. Supposed to be used for streaming
downloadFile file_id:int32 priority:int32 offset:int32 = File;
//@limit Download will be automatically cancelled when it is downloaded more or equeal than limit bytes starting from offset.
downloadFile file_id:int32 priority:int32 offset:int32 limit:int32 = File;
//@description Returns file downloaded prefix size from a given offset @file_id Identifier of the file @offset Offset from which downloaded prefix size should be calculated
getFileDownloadedPrefixSize file_id:int32 offset:int32 = Count;

Binary file not shown.

View File

@ -6148,7 +6148,7 @@ void MessagesManager::load_secret_thumbnail(FileId thumbnail_file_id) {
});
send_closure(G()->file_manager(), &FileManager::download, thumbnail_file_id,
std::make_shared<Callback>(std::move(download_promise)), 1, -1);
std::make_shared<Callback>(std::move(download_promise)), 1, -1, -1);
}
void MessagesManager::on_upload_media(FileId file_id, tl_object_ptr<telegram_api::InputFile> input_file,

View File

@ -5739,7 +5739,8 @@ void Td::on_request(uint64 id, const td_api::downloadFile &request) {
if (request.offset_ < 0) {
return send_error_raw(id, 5, "Download offset must be non-negative");
}
file_manager_->download(FileId(request.file_id_, 0), download_file_callback_, priority, request.offset_);
file_manager_->download(FileId(request.file_id_, 0), download_file_callback_, priority, request.offset_,
request.limit_);
auto file = file_manager_->get_file_object(FileId(request.file_id_, 0), false);
if (file->id_ == 0) {
@ -5750,7 +5751,7 @@ void Td::on_request(uint64 id, const td_api::downloadFile &request) {
}
void Td::on_request(uint64 id, const td_api::cancelDownloadFile &request) {
file_manager_->download(FileId(request.file_id_, 0), nullptr, request.only_if_pending_ ? -1 : 0, -1);
file_manager_->download(FileId(request.file_id_, 0), nullptr, request.only_if_pending_ ? -1 : 0, -1, -1);
send_closure(actor_id(this), &Td::send_result, id, make_tl_object<td_api::ok>());
}

View File

@ -2353,26 +2353,31 @@ class CliClient final : public Actor {
string file_id;
string priority;
string offset;
string limit;
std::tie(file_id, args) = split(args);
std::tie(offset, priority) = split(args);
std::tie(offset, args) = split(args);
std::tie(limit, priority) = split(args);
if (priority.empty()) {
priority = "1";
}
send_request(make_tl_object<td_api::downloadFile>(as_file_id(file_id), to_integer<int32>(priority),
to_integer<int32>(offset)));
to_integer<int32>(offset), to_integer<int32>(limit)));
} else if (op == "dff") {
string max_file_id;
string priority;
string offset;
string limit;
std::tie(max_file_id, args) = split(args);
std::tie(offset, priority) = split(args);
std::tie(offset, args) = split(args);
std::tie(limit, priority) = split(args);
if (priority.empty()) {
priority = "1";
}
for (int i = 1; i <= as_file_id(max_file_id); i++) {
send_request(make_tl_object<td_api::downloadFile>(i, to_integer<int32>(priority), to_integer<int32>(offset)));
send_request(make_tl_object<td_api::downloadFile>(i, to_integer<int32>(priority), to_integer<int32>(offset),
to_integer<int32>(limit)));
}
} else if (op == "cdf") {
send_request(make_tl_object<td_api::cancelDownloadFile>(as_file_id(args), false));

View File

@ -36,7 +36,7 @@ namespace td {
FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const LocalFileLocation &local, int64 size,
string name, const FileEncryptionKey &encryption_key, bool is_small, bool search_file,
int64 offset, unique_ptr<Callback> callback)
int64 offset, int64 limit, unique_ptr<Callback> callback)
: remote_(remote)
, local_(local)
, size_(size)
@ -45,7 +45,8 @@ FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const Local
, callback_(std::move(callback))
, is_small_(is_small)
, search_file_(search_file)
, offset_(offset) {
, offset_(offset)
, limit_(limit) {
if (encryption_key.is_secret()) {
set_ordered_flag(true);
}
@ -114,6 +115,7 @@ Result<FileLoader::FileInfo> FileDownloader::init() {
remote_.file_type_ == FileType::Video || remote_.file_type_ == FileType::Animation ||
(remote_.file_type_ == FileType::Encrypted && size_ > (1 << 20)));
res.offset = offset_;
res.limit = limit_;
return res;
}
Status FileDownloader::on_ok(int64 size) {

View File

@ -34,7 +34,7 @@ class FileDownloader : public FileLoader {
};
FileDownloader(const FullRemoteFileLocation &remote, const LocalFileLocation &local, int64 size, string name,
const FileEncryptionKey &encryption_key, bool is_small, bool search_file, int64 offset,
const FileEncryptionKey &encryption_key, bool is_small, bool search_file, int64 offset, int64 limit,
unique_ptr<Callback> callback);
// Should just implement all parent pure virtual methods.
@ -58,6 +58,7 @@ class FileDownloader : public FileLoader {
bool is_small_;
bool search_file_{false};
int64 offset_;
int64 limit_;
bool use_cdn_ = false;
DcId cdn_dc_id_;

View File

@ -85,10 +85,10 @@ class FileDownloadGenerateActor : public FileGenerateActor {
};
send_closure(G()->file_manager(), &FileManager::download, file_id_, std::make_shared<Callback>(actor_id(this)), 1,
-1);
-1, -1);
}
void hangup() override {
send_closure(G()->file_manager(), &FileManager::download, file_id_, nullptr, 0, -1);
send_closure(G()->file_manager(), &FileManager::download, file_id_, nullptr, 0, -1, -1);
stop();
}

View File

@ -39,7 +39,8 @@ ActorOwn<ResourceManager> &FileLoadManager::get_download_resource_manager(bool i
void FileLoadManager::download(QueryId id, const FullRemoteFileLocation &remote_location,
const LocalFileLocation &local, int64 size, string name,
const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int8 priority) {
const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int64 limit,
int8 priority) {
if (stop_flag_) {
return;
}
@ -50,8 +51,9 @@ void FileLoadManager::download(QueryId id, const FullRemoteFileLocation &remote_
node->query_id_ = id;
auto callback = make_unique<FileDownloaderCallback>(actor_shared(this, node_id));
bool is_small = size < 20 * 1024;
node->loader_ = create_actor<FileDownloader>("Downloader", remote_location, local, size, std::move(name),
encryption_key, is_small, search_file, offset, std::move(callback));
node->loader_ =
create_actor<FileDownloader>("Downloader", remote_location, local, size, std::move(name), encryption_key,
is_small, search_file, offset, limit, std::move(callback));
DcId dc_id = remote_location.is_web() ? G()->get_webfile_dc_id() : remote_location.get_dc_id();
auto &resource_manager = get_download_resource_manager(is_small, dc_id);
send_closure(resource_manager, &ResourceManager::register_worker,
@ -170,6 +172,20 @@ void FileLoadManager::update_download_offset(QueryId id, int64 offset) {
}
send_closure(node->loader_, &FileLoaderActor::update_download_offset, offset);
}
void FileLoadManager::update_download_limit(QueryId id, int64 limit) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_download_limit, limit);
}
void FileLoadManager::hangup() {
nodes_container_.for_each([](auto id, auto &node) { node.loader_.reset(); });
stop_flag_ = true;

View File

@ -49,7 +49,8 @@ class FileLoadManager final : public Actor {
explicit FileLoadManager(ActorShared<Callback> callback, ActorShared<> parent);
void download(QueryId id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local, int64 size,
string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int8 priority);
string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int64 limit,
int8 priority);
void upload(QueryId id, const LocalFileLocation &local_location, const RemoteFileLocation &remote_location,
int64 expected_size, const FileEncryptionKey &encryption_key, int8 priority, vector<int> bad_parts);
void upload_by_hash(QueryId id, const FullLocalFileLocation &local_location, int64 size, int8 priority);
@ -58,6 +59,7 @@ class FileLoadManager final : public Actor {
void cancel(QueryId id);
void update_local_file_location(QueryId id, const LocalFileLocation &local);
void update_download_offset(QueryId id, int64 offset);
void update_download_limit(QueryId id, int64 limit);
void get_content(const FullLocalFileLocation &local_location, Promise<BufferSlice> promise);
private:

View File

@ -74,6 +74,12 @@ void FileLoader::update_download_offset(int64 offset) {
loop();
}
void FileLoader::update_download_limit(int64 limit) {
parts_manager_.set_streaming_limit(limit);
update_estimated_limit();
loop();
}
void FileLoader::start_up() {
auto r_file_info = init();
if (r_file_info.is_error()) {
@ -98,6 +104,7 @@ void FileLoader::start_up() {
parts_manager_.set_checked_prefix_size(0);
}
parts_manager_.set_streaming_offset(file_info.offset);
parts_manager_.set_streaming_limit(file_info.limit);
if (ordered_flag_) {
ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count());
}
@ -205,9 +212,9 @@ void FileLoader::update_estimated_limit() {
if (stop_flag_) {
return;
}
auto estimated_exta = parts_manager_.get_expected_size() - parts_manager_.get_ready_size();
resource_state_.update_estimated_limit(estimated_exta);
VLOG(files) << "update estimated limit " << estimated_exta;
auto estimated_extra = parts_manager_.get_estimated_extra();
resource_state_.update_estimated_limit(estimated_extra);
VLOG(files) << "update estimated limit " << estimated_extra;
if (!resource_manager_.empty()) {
keep_fd_flag(narrow_cast<uint64>(resource_state_.active_limit()) >= parts_manager_.get_part_size());
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);

View File

@ -40,6 +40,7 @@ class FileLoader : public FileLoaderActor {
void update_local_file_location(const LocalFileLocation &local) override;
void update_download_offset(int64 offset) override;
void update_download_limit(int64 limit) override;
protected:
void set_ordered_flag(bool flag);
@ -59,6 +60,7 @@ class FileLoader : public FileLoaderActor {
bool only_check = false;
bool need_delay = false;
int64 offset{0};
int64 limit{0};
};
virtual Result<FileInfo> init() TD_WARN_UNUSED_RESULT = 0;
virtual Status on_ok(int64 size) TD_WARN_UNUSED_RESULT = 0;

View File

@ -28,6 +28,8 @@ class FileLoaderActor : public NetQueryCallback {
}
virtual void update_download_offset(int64 offset) {
}
virtual void update_download_limit(int64 limit) {
}
};
} // namespace td

View File

@ -158,6 +158,17 @@ void FileNode::set_download_offset(int64 download_offset) {
recalc_ready_prefix_size(-1, -1);
on_info_changed();
}
void FileNode::set_download_limit(int64 download_limit) {
if (download_limit < 0) {
return;
}
if (download_limit == download_limit_) {
return;
}
download_limit_ = download_limit;
is_download_offset_dirty_ = true;
}
void FileNode::drop_local_location() {
set_local_location(LocalFileLocation(), 0, -1, -1);
@ -1821,8 +1832,8 @@ void FileManager::delete_file(FileId file_id, Promise<Unit> promise, const char
promise.set_value(Unit());
}
void FileManager::download(FileId file_id, std::shared_ptr<DownloadCallback> callback, int32 new_priority,
int64 offset) {
void FileManager::download(FileId file_id, std::shared_ptr<DownloadCallback> callback, int32 new_priority, int64 offset,
int64 limit) {
LOG(INFO) << "Download file " << file_id << " with priority " << new_priority;
auto node = get_sync_file_node(file_id);
if (!node) {
@ -1870,6 +1881,7 @@ void FileManager::download(FileId file_id, std::shared_ptr<DownloadCallback> cal
LOG(INFO) << "Change download priority of file " << file_id << " to " << new_priority;
node->set_download_offset(offset);
node->set_download_limit(limit);
auto *file_info = get_file_id_info(file_id);
CHECK(new_priority == 0 || callback);
file_info->download_priority_ = narrow_cast<int8>(new_priority);
@ -1913,9 +1925,16 @@ void FileManager::run_download(FileNodePtr node) {
bool need_update_offset = node->is_download_offset_dirty_;
node->is_download_offset_dirty_ = false;
bool need_update_limit = node->is_download_limit_dirty_;
node->is_download_limit_dirty_ = false;
if (old_priority != 0) {
CHECK(node->download_id_ != 0);
send_closure(file_load_manager_, &FileLoadManager::update_priority, node->download_id_, priority);
if (need_update_limit) {
auto download_limit = node->download_limit_;
send_closure(file_load_manager_, &FileLoadManager::update_download_limit, node->download_id_, download_limit);
}
if (need_update_offset) {
auto download_offset = file_view.is_encrypted_any() ? 0 : node->download_offset_;
send_closure(file_load_manager_, &FileLoadManager::update_download_offset, node->download_id_, download_offset);
@ -1960,9 +1979,10 @@ void FileManager::run_download(FileNodePtr node) {
<< node->remote_.full.value() << " with suggested name " << node->suggested_name() << " and encyption key "
<< node->encryption_key_;
auto download_offset = file_view.is_encrypted_any() ? 0 : node->download_offset_;
auto download_limit = node->download_limit_;
send_closure(file_load_manager_, &FileLoadManager::download, id, node->remote_.full.value(), node->local_,
node->size_, node->suggested_name(), node->encryption_key_, node->can_search_locally_, download_offset,
priority);
download_limit, priority);
}
class ForceUploadActor : public Actor {
@ -3143,7 +3163,9 @@ void FileManager::on_error_impl(FileNodePtr node, FileManager::Query::Type type,
if (status.code() == 0) {
// Remove partial locations
if (node->local_.type() == LocalFileLocation::Type::Partial &&
!begins_with(status.message(), "FILE_UPLOAD_RESTART")) {
!begins_with(status.message(), "FILE_UPLOAD_RESTART") &&
!begins_with(status.message(), "FILE_DOWNLOAD_RESTART") &&
!begins_with(status.message(), "FILE_DOWNLOAD_LIMIT")) {
CSlice path = node->local_.partial().path_;
if (begins_with(path, get_files_temp_dir(FileType::Encrypted)) ||
begins_with(path, get_files_temp_dir(FileType::Video))) {

View File

@ -103,6 +103,7 @@ class FileNode {
void set_generate_priority(int8 download_priority, int8 upload_priority);
void set_download_offset(int64 download_offset);
void set_download_limit(int64 download_limit);
void on_changed();
void on_info_changed();
@ -123,6 +124,7 @@ class FileNode {
LocalFileLocation local_;
FileLoadManager::QueryId upload_id_ = 0;
int64 download_offset_ = 0;
int64 download_limit_ = 0;
int64 local_ready_size_ = 0; // PartialLocal only
int64 local_ready_prefix_size_ = 0; // PartialLocal only
@ -155,6 +157,7 @@ class FileNode {
int8 main_file_id_priority_ = 0;
bool is_download_offset_dirty_ = false;
bool is_download_limit_dirty_ = false;
bool get_by_hash_ = false;
bool can_search_locally_{true};
@ -395,7 +398,8 @@ class FileManager : public FileLoadManager::Callback {
bool set_encryption_key(FileId file_id, FileEncryptionKey key);
bool set_content(FileId file_id, BufferSlice bytes);
void download(FileId file_id, std::shared_ptr<DownloadCallback> callback, int32 new_priority, int64 offset);
void download(FileId file_id, std::shared_ptr<DownloadCallback> callback, int32 new_priority, int64 offset,
int64 limit);
void upload(FileId file_id, std::shared_ptr<UploadCallback> callback, int32 new_priority, uint64 upload_order);
void resume_upload(FileId file_id, std::vector<int> bad_parts, std::shared_ptr<UploadCallback> callback,
int32 new_priority, uint64 upload_order, bool force = false);

View File

@ -49,6 +49,9 @@ void PartsManager::set_streaming_offset(int64 offset) {
part_status_.resize(part_count_, PartStatus::Empty);
}
}
void PartsManager::set_streaming_limit(int64 limit) {
streaming_limit_ = limit;
}
Status PartsManager::init_no_size(size_t part_size, const std::vector<int> &ready_parts) {
unknown_size_flag_ = true;
@ -198,6 +201,27 @@ Result<Part> PartsManager::start_part() {
}
}
}
if (streaming_limit_ != 0) {
auto offset = static_cast<int64>(part_i * get_part_size());
int64 ready = 0;
if (part_i == first_streaming_empty_part_) {
if (offset > streaming_offset_) {
ready += offset - streaming_offset_;
}
} else {
ready += offset;
CHECK(!unknown_size_flag_);
if (streaming_offset_ < size_) {
ready += size_ - streaming_offset_;
}
}
if (ready >= streaming_limit_) {
return Status::Error("FILE_DOWNLOAD_LIMIT");
}
}
CHECK(part_status_[part_i] == PartStatus::Empty);
on_part_start(part_i);
return get_part(part_i);
@ -296,6 +320,14 @@ int64 PartsManager::get_size_or_zero() const {
return size_;
}
int64 PartsManager::get_estimated_extra() const {
auto total_estimated_extra = get_expected_size() - get_ready_size();
if (streaming_limit_ != 0) {
return std::min(streaming_limit_, total_estimated_extra);
}
return total_estimated_extra;
}
int64 PartsManager::get_ready_size() const {
return ready_size_;
}

View File

@ -36,12 +36,14 @@ class PartsManager {
void set_need_check();
void set_checked_prefix_size(int64 size);
void set_streaming_offset(int64 offset);
void set_streaming_limit(int64 limit);
int64 get_checked_prefix_size() const;
int64 get_unchecked_ready_prefix_size();
int64 get_size() const;
int64 get_size_or_zero() const;
int64 get_expected_size() const;
int64 get_estimated_extra() const;
int64 get_ready_size() const;
size_t get_part_size() const;
int32 get_part_count() const;
@ -75,6 +77,7 @@ class PartsManager {
int first_empty_part_;
int first_not_ready_part_;
int64 streaming_offset_{0};
int64 streaming_limit_{0};
int first_streaming_empty_part_;
vector<PartStatus> part_status_;
Bitmask bitmask_;

View File

@ -31,6 +31,14 @@ class ResourceState {
bool update_estimated_limit(int64 extra) {
auto new_estimated_limit = used_ + extra;
// Use extra extra limit
if (new_estimated_limit < limit_) {
auto extra_limit = limit_ - new_estimated_limit;
used_ += extra_limit;
new_estimated_limit += extra_limit;
}
if (new_estimated_limit == estimated_limit_) {
return false;
}