From 3b238f6fbaefc7b4cf1b64d1b19300e040bc5ffd Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Sun, 11 Nov 2018 15:38:04 +0400 Subject: [PATCH] Files streaming GitOrigin-RevId: 78df1cd44c95380cd4af46f4db809ce28876db1f --- CMakeLists.txt | 2 + td/generate/scheme/td_api.tl | 16 ++- td/generate/scheme/td_api.tlo | Bin 139476 -> 139804 bytes td/telegram/InlineQueriesManager.cpp | 6 +- td/telegram/MessagesManager.cpp | 2 +- td/telegram/Td.cpp | 23 +++- td/telegram/Td.h | 4 + td/telegram/cli.cpp | 25 +++- td/telegram/files/FileBitmask.cpp | 84 ++++++++++++ td/telegram/files/FileBitmask.h | 50 +++++++ td/telegram/files/FileDownloader.cpp | 33 +++-- td/telegram/files/FileDownloader.h | 6 +- td/telegram/files/FileGenerateManager.cpp | 10 +- td/telegram/files/FileLoadManager.cpp | 18 ++- td/telegram/files/FileLoadManager.h | 3 +- td/telegram/files/FileLoader.cpp | 19 ++- td/telegram/files/FileLoader.h | 8 +- td/telegram/files/FileLoaderActor.h | 4 +- td/telegram/files/FileLocation.h | 17 ++- td/telegram/files/FileManager.cpp | 151 ++++++++++++++++------ td/telegram/files/FileManager.h | 17 ++- td/telegram/files/FileUploader.cpp | 8 +- td/telegram/files/FileUploader.h | 3 +- td/telegram/files/PartsManager.cpp | 59 ++++++++- td/telegram/files/PartsManager.h | 7 + tdutils/td/utils/misc.cpp | 50 +++++++ tdutils/td/utils/misc.h | 4 + test/tdclient.cpp | 2 +- 28 files changed, 526 insertions(+), 105 deletions(-) create mode 100644 td/telegram/files/FileBitmask.cpp create mode 100644 td/telegram/files/FileBitmask.h diff --git a/CMakeLists.txt b/CMakeLists.txt index ad21f270..ca4d1631 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -358,6 +358,7 @@ set(TDLIB_SOURCE td/telegram/DialogParticipant.cpp td/telegram/DocumentsManager.cpp td/telegram/DraftMessage.cpp + td/telegram/files/FileBitmask.cpp td/telegram/files/FileDb.cpp td/telegram/files/FileDownloader.cpp td/telegram/files/FileFromBytes.cpp @@ -478,6 +479,7 @@ set(TDLIB_SOURCE td/telegram/DialogParticipant.h td/telegram/DocumentsManager.h td/telegram/DraftMessage.h + td/telegram/files/FileBitmask.h td/telegram/files/FileDb.h td/telegram/files/FileDownloader.h td/telegram/files/FileFromBytes.h diff --git a/td/generate/scheme/td_api.tl b/td/generate/scheme/td_api.tl index ef42de6a..9671e178 100644 --- a/td/generate/scheme/td_api.tl +++ b/td/generate/scheme/td_api.tl @@ -126,9 +126,10 @@ temporaryPasswordState has_password:Bool valid_for:int32 = TemporaryPasswordStat //@can_be_deleted True, if the file can be deleted //@is_downloading_active True, if the file is currently being downloaded (or a local copy is being generated by some other means) //@is_downloading_completed True, if the local copy is fully available +//@download_offset Download will be started from this offset. downloaded_prefix_size is calculated from this offset. //@downloaded_prefix_size If is_downloading_completed is false, then only some prefix of the file is ready to be read. downloaded_prefix_size is the size of that prefix //@downloaded_size Total downloaded file bytes. Should be used only for calculating download progress. The actual file size may be bigger, and some parts of it may contain garbage -localFile path:string can_be_downloaded:Bool can_be_deleted:Bool is_downloading_active:Bool is_downloading_completed:Bool downloaded_prefix_size:int32 downloaded_size:int32 = LocalFile; +localFile path:string can_be_downloaded:Bool can_be_deleted:Bool is_downloading_active:Bool is_downloading_completed:Bool download_offset:int32 downloaded_prefix_size:int32 downloaded_size:int32 = LocalFile; //@description Represents a remote file //@id Remote file identifier; may be empty. Can be used across application restarts or even from other devices for the current user. If the ID starts with "http://" or "https://", it represents the HTTP URL of the file. TDLib is currently unable to download files if only their URL is known. @@ -2948,7 +2949,18 @@ setPinnedChats chat_ids:vector = Ok; //@description Asynchronously downloads a file from the cloud. updateFile will be used to notify about the download progress and successful completion of the download. Returns file state just after the download has been started //@file_id Identifier of the file to download //@priority Priority of the download (1-32). The higher the priority, the earlier the file will be downloaded. If the priorities of two files are equal, then the last one for which downloadFile was called will be downloaded first -downloadFile file_id:int32 priority:int32 = File; +//@offset File will be downloaded starting from offset first. Supposed to be used for streaming. +downloadFile file_id:int32 priority:int32 offset:int32 = File; + +//@description Set offset for file downloading +//@file_id Identifier of file +//@offset File download offset +setFileDownloadOffset file_id:int32 offset:int32 = File; + +//@description Get downloaded prefix from a given offset +//@file_id Identifier of file +//@offset Offset from which downloaded prefix is calculated +getFileDownloadedPrefix file_id:int32 offset:int32 = Count; //@description Stops the downloading of a file. If a file has already been downloaded, does nothing @file_id Identifier of a file to stop downloading @only_if_pending Pass true to stop downloading only if it hasn't been started, i.e. request hasn't been sent to server cancelDownloadFile file_id:int32 only_if_pending:Bool = Ok; diff --git a/td/generate/scheme/td_api.tlo b/td/generate/scheme/td_api.tlo index 3f98fc39dc7c06bcf2318831f00c57c42c032f7e..59d436559683959ba142d86d26b8210710db2c85 100644 GIT binary patch delta 265 zcmca|kYmmfjtzeVSPrXH?b;-GCl<`u{2*^lpd?6$GbcYeF~==4CzXK##J_n=$cJOH z<6-s5XAbNT;7`df&&$bAOo`7=ODj$-nLZ(rQF3zY!DxsnPYxb9#OSbnX$xb&qU;_y zwbMK>^$-IYDw5Xff=pzd9=MFreUdZl^u2ExrKcO!-Qv!-o(=sbqK3e!r_gTiMGyTJLMw!V01{{-pk_5I}>|=c3A+=j;)@)IrlaMv~ ULmcV0jL`_J1!9o|JJTLX089OA;s5{u delta 125 zcmbPpgyYIVjtzeVSlZgVj5Z10i3Kw@Kge4XC copy(const tl_object_ptr &obj) { template <> td_api::object_ptr copy(const td_api::localFile &obj) { - return td_api::make_object(obj.path_, obj.can_be_downloaded_, obj.can_be_deleted_, - obj.is_downloading_active_, obj.is_downloading_completed_, - obj.downloaded_prefix_size_, obj.downloaded_size_); + return td_api::make_object( + obj.path_, obj.can_be_downloaded_, obj.can_be_deleted_, obj.is_downloading_active_, obj.is_downloading_completed_, + obj.download_offset_, obj.downloaded_prefix_size_, obj.downloaded_size_); } template <> td_api::object_ptr copy(const td_api::remoteFile &obj) { diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index a958056b..e404fb69 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -6224,7 +6224,7 @@ void MessagesManager::load_secret_thumbnail(FileId thumbnail_file_id) { }); send_closure(G()->file_manager(), &FileManager::download, thumbnail_file_id, - std::make_shared(std::move(download_promise)), 1); + std::make_shared(std::move(download_promise)), 1, -1); } void MessagesManager::on_upload_media(FileId file_id, tl_object_ptr input_file, diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index 11b5ea46..d69c870e 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -4858,6 +4858,15 @@ void Td::on_request(uint64 id, const td_api::getFile &request) { send_closure(actor_id(this), &Td::send_result, id, file_manager_->get_file_object(FileId(request.file_id_, 0))); } +void Td::on_request(uint64 id, const td_api::getFileDownloadedPrefix &request) { + auto file_view = file_manager_->get_file_view(FileId(request.file_id_, 0)); + if (file_view.empty()) { + return send_closure(actor_id(this), &Td::send_error, id, Status::Error(10, "Unknown file id")); + } + send_closure(actor_id(this), &Td::send_result, id, + td_api::make_object(static_cast(file_view.downloaded_prefix(request.offset_)))); +} + void Td::on_request(uint64 id, td_api::getRemoteFile &request) { CLEAN_INPUT_STRING(request.remote_file_id_); auto r_file_id = file_manager_->from_persistent_id( @@ -5668,7 +5677,7 @@ void Td::on_request(uint64 id, const td_api::downloadFile &request) { if (!(1 <= priority && priority <= 32)) { return send_error_raw(id, 5, "Download priority must be in [1;32] range"); } - file_manager_->download(FileId(request.file_id_, 0), download_file_callback_, priority); + file_manager_->download(FileId(request.file_id_, 0), download_file_callback_, priority, request.offset_); auto file = file_manager_->get_file_object(FileId(request.file_id_, 0), false); if (file->id_ == 0) { @@ -5678,8 +5687,18 @@ void Td::on_request(uint64 id, const td_api::downloadFile &request) { send_closure(actor_id(this), &Td::send_result, id, std::move(file)); } +void Td::on_request(uint64 id, const td_api::setFileDownloadOffset &request) { + file_manager_->download_set_offset(FileId(request.file_id_, 0), request.offset_); + auto file = file_manager_->get_file_object(FileId(request.file_id_, 0), false); + if (file->id_ == 0) { + return send_error_raw(id, 400, "Invalid file id"); + } + + send_closure(actor_id(this), &Td::send_result, id, std::move(file)); +} + void Td::on_request(uint64 id, const td_api::cancelDownloadFile &request) { - file_manager_->download(FileId(request.file_id_, 0), nullptr, request.only_if_pending_ ? -1 : 0); + file_manager_->download(FileId(request.file_id_, 0), nullptr, 0, request.only_if_pending_ ? -1 : 0); send_closure(actor_id(this), &Td::send_result, id, make_tl_object()); } diff --git a/td/telegram/Td.h b/td/telegram/Td.h index 747c8b2a..c7b4f82c 100644 --- a/td/telegram/Td.h +++ b/td/telegram/Td.h @@ -442,6 +442,8 @@ class Td final : public NetQueryCallback { void on_request(uint64 id, const td_api::getFile &request); + void on_request(uint64 id, const td_api::getFileDownloadedPrefix &request); + void on_request(uint64 id, td_api::getRemoteFile &request); void on_request(uint64 id, td_api::getStorageStatistics &request); @@ -644,6 +646,8 @@ class Td final : public NetQueryCallback { void on_request(uint64 id, const td_api::downloadFile &request); + void on_request(uint64 id, const td_api::setFileDownloadOffset &request); + void on_request(uint64 id, const td_api::cancelDownloadFile &request); void on_request(uint64 id, td_api::uploadFile &request); diff --git a/td/telegram/cli.cpp b/td/telegram/cli.cpp index b013e36b..7a73a234 100644 --- a/td/telegram/cli.cpp +++ b/td/telegram/cli.cpp @@ -2251,6 +2251,11 @@ class CliClient final : public Actor { send_request(make_tl_object(as_chat_id(chat_id), to_integer(date))); } else if (op == "gf" || op == "GetFile") { send_request(make_tl_object(as_file_id(args))); + } else if (op == "gfp" || op == "GetFileDownloadedPrefix") { + string file_id; + string offset; + std::tie(file_id, offset) = split(args); + send_request(make_tl_object(as_file_id(file_id), to_integer(offset))); } else if (op == "grf") { send_request(make_tl_object(args, nullptr)); } else if (op == "gmtf") { @@ -2271,26 +2276,38 @@ class CliClient final : public Actor { send_request(make_tl_object( as_location(latitude, longitude), to_integer(zoom), to_integer(width), to_integer(height), to_integer(scale), as_chat_id(chat_id))); + } else if (op == "sfdo" || op == "SetDownloadFileOffset") { + string file_id_str; + string offset; + std::tie(file_id_str, offset) = split(args); + + auto file_id = as_file_id(file_id_str); + send_request(make_tl_object(file_id, to_integer(offset))); } else if (op == "df" || op == "DownloadFile") { string file_id_str; string priority; - std::tie(file_id_str, priority) = split(args); + string offset; + std::tie(file_id_str, args) = split(args); + std::tie(priority, offset) = split(args); if (priority.empty()) { priority = "1"; } auto file_id = as_file_id(file_id_str); - send_request(make_tl_object(file_id, to_integer(priority))); + send_request( + make_tl_object(file_id, to_integer(priority), to_integer(offset))); } else if (op == "dff") { string file_id; string priority; - std::tie(file_id, priority) = split(args); + string offset; + std::tie(file_id, args) = split(args); + std::tie(priority, offset) = split(args); if (priority.empty()) { priority = "1"; } for (int i = 1; i <= as_file_id(file_id); i++) { - send_request(make_tl_object(i, to_integer(priority))); + send_request(make_tl_object(i, to_integer(priority), to_integer(offset))); } } else if (op == "cdf") { send_request(make_tl_object(as_file_id(args), false)); diff --git a/td/telegram/files/FileBitmask.cpp b/td/telegram/files/FileBitmask.cpp new file mode 100644 index 00000000..004d5f40 --- /dev/null +++ b/td/telegram/files/FileBitmask.cpp @@ -0,0 +1,84 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/telegram/files/FileBitmask.h" +#include "td/utils/misc.h" +namespace td { +Bitmask::Bitmask(Decode, Slice data) : data_(zero_one_decode(data)) { +} +Bitmask::Bitmask(Ones, int64 count) : data_((count + 7) / 8, '\0') { + for (int64 i = 0; i < count; i++) { + set(i); + } +} +std::string Bitmask::encode() const { + // remove zeroes in the end to make encoding deteministic + td::Slice data(data_); + while (!data.empty() && data.back() == 0) { + data.remove_suffix(1); + } + return zero_one_encode(data_); +} +Bitmask::ReadySize Bitmask::get_ready_size(int64 offset, int64 part_size) const { + ReadySize res; + res.offset = offset; + auto offset_part = offset / part_size; + auto ones = get_ready_parts(offset_part); + if (ones == 0) { + res.ready_size = 0; + } else { + res.ready_size = (offset_part + ones) * part_size - offset; + } + CHECK(res.ready_size >= 0); + return res; +} +int64 Bitmask::get_total_size(int64 part_size) const { + int64 res = 0; + for (int64 i = 0; i < size(); i++) { + res += get(i); + } + return res * part_size; +} +bool Bitmask::get(int64 offset) const { + if (offset < 0) { + return 0; + } + if (offset / 8 >= narrow_cast(data_.size())) { + return 0; + } + return (data_[offset / 8] & (1 << (offset % 8))) != 0; +} + +int64 Bitmask::get_ready_parts(int64 offset) const { + int64 res = 0; + while (get(offset + res)) { + res++; + } + return res; +}; + +std::vector Bitmask::as_vector() const { + std::vector res; + for (int32 i = 0; i < narrow_cast(data_.size() * 8); i++) { + if (get(i)) { + res.push_back(i); + } + } + return res; +} +void Bitmask::set(int64 offset) { + auto need_size = narrow_cast(offset / 8 + 1); + if (need_size > data_.size()) { + data_.resize(need_size, 0); + } + data_[need_size - 1] |= (1 << (offset % 8)); +} + +int64 Bitmask::size() const { + return data_.size() * 8; +} + +} // namespace td diff --git a/td/telegram/files/FileBitmask.h b/td/telegram/files/FileBitmask.h new file mode 100644 index 00000000..72ba398d --- /dev/null +++ b/td/telegram/files/FileBitmask.h @@ -0,0 +1,50 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once +#include "td/utils/common.h" +#include "td/utils/Slice.h" +#include "td/utils/StringBuilder.h" + +namespace td { +class Bitmask { + public: + struct ReadySize { + int64 offset{-1}; + int64 ready_size{-1}; + bool empty() const { + return offset == -1; + } + }; + struct Decode {}; + struct Ones {}; + Bitmask() = default; + Bitmask(Decode, Slice data); + Bitmask(Ones, int64 count); + std::string encode() const; + ReadySize get_ready_size(int64 offset, int64 part_size) const; + int64 get_total_size(int64 part_size) const; + bool get(int64 offset) const; + + int64 get_ready_parts(int64 offset) const; + + std::vector as_vector() const; + void set(int64 offset); + int64 size() const; + + private: + std::string data_; +}; + +inline StringBuilder &operator<<(StringBuilder &sb, const Bitmask &mask) { + std::string res; + for (int64 i = 0; i < mask.size(); i++) { + res += mask.get(i) ? "1" : "0"; + } + return sb << res; +} + +} // namespace td diff --git a/td/telegram/files/FileDownloader.cpp b/td/telegram/files/FileDownloader.cpp index b7dbbf8c..25cb88a1 100644 --- a/td/telegram/files/FileDownloader.cpp +++ b/td/telegram/files/FileDownloader.cpp @@ -32,7 +32,7 @@ namespace td { FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const LocalFileLocation &local, int64 size, string name, const FileEncryptionKey &encryption_key, bool is_small, bool search_file, - unique_ptr callback) + int64 offset, unique_ptr callback) : remote_(remote) , local_(local) , size_(size) @@ -40,10 +40,14 @@ FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const Local , encryption_key_(encryption_key) , callback_(std::move(callback)) , is_small_(is_small) - , search_file_(search_file) { + , search_file_(search_file) + , offset_(offset) { if (encryption_key.is_secret()) { set_ordered_flag(true); } + if (!encryption_key.empty()) { + CHECK(offset_ == 0); + } } Result FileDownloader::init() { @@ -59,22 +63,22 @@ Result FileDownloader::init() { if (remote_.file_type_ == FileType::Secure) { size_ = 0; } - int ready_part_count = 0; int32 part_size = 0; + Bitmask bitmask{Bitmask::Ones{}, 0}; if (local_.type() == LocalFileLocation::Type::Partial) { const auto &partial = local_.partial(); path_ = partial.path_; auto result_fd = FileFd::open(path_, FileFd::Write | FileFd::Read); // TODO: check timestamps.. if (result_fd.is_ok()) { + bitmask = Bitmask(Bitmask::Decode{}, partial.ready_bitmask_); if (encryption_key_.is_secret()) { CHECK(partial.iv_.size() == 32) << partial.iv_.size(); encryption_key_.mutable_iv() = as(partial.iv_.data()); - next_part_ = partial.ready_part_count_; + next_part_ = narrow_cast(bitmask.get_ready_parts(0)); } fd_ = result_fd.move_as_ok(); part_size = partial.part_size_; - ready_part_count = partial.ready_part_count_; } } if (search_file_ && fd_.empty() && size_ > 0 && size_ < 1000 * (1 << 20) && encryption_key_.empty() && @@ -88,27 +92,23 @@ Result FileDownloader::init() { need_check_ = true; only_check_ = true; part_size = 32 * (1 << 10); - ready_part_count = narrow_cast((size_ + part_size - 1) / part_size); + bitmask = Bitmask{Bitmask::Ones{}, (size_ + part_size - 1) / part_size}; return Status::OK(); }(); } - std::vector parts(ready_part_count); - for (int i = 0; i < ready_part_count; i++) { - parts[i] = i; - } - FileInfo res; res.size = size_; res.is_size_final = true; res.part_size = part_size; - res.ready_parts = std::move(parts); + res.ready_parts = bitmask.as_vector(); res.use_part_count_limit = false; res.only_check = only_check_; res.need_delay = !is_small_ && (remote_.file_type_ == FileType::VideoNote || remote_.file_type_ == FileType::VoiceNote || remote_.file_type_ == FileType::Audio || remote_.file_type_ == FileType::Video || remote_.file_type_ == FileType::Animation || (remote_.file_type_ == FileType::Encrypted && size_ > (1 << 20))); + res.offset = offset_; return res; } Status FileDownloader::on_ok(int64 size) { @@ -345,8 +345,8 @@ Result FileDownloader::process_part(Part part, NetQueryPtr net_query) { } return written; } -void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready, - int64 ready_size) { +void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, string ready_bitmask, + bool is_ready, int64 ready_size) { if (is_ready) { // do not send partial location. will lead to wrong local_size return; @@ -355,7 +355,7 @@ void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_ return; } if (encryption_key_.empty() || encryption_key_.is_secure()) { - callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, path_, part_size, ready_part_count, ""}, + callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, path_, part_size, "", ready_bitmask}, ready_size); } else if (encryption_key_.is_secret()) { UInt256 iv; @@ -365,8 +365,7 @@ void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_ LOG(FATAL) << tag("ready_part_count", ready_part_count) << tag("next_part", next_part_); } callback_->on_partial_download( - PartialLocalFileLocation{remote_.file_type_, path_, part_size, ready_part_count, as_slice(iv).str()}, - ready_size); + PartialLocalFileLocation{remote_.file_type_, path_, part_size, as_slice(iv).str(), ready_bitmask}, ready_size); } else { UNREACHABLE(); } diff --git a/td/telegram/files/FileDownloader.h b/td/telegram/files/FileDownloader.h index 88db9629..4c5d23e2 100644 --- a/td/telegram/files/FileDownloader.h +++ b/td/telegram/files/FileDownloader.h @@ -33,7 +33,7 @@ class FileDownloader : public FileLoader { }; FileDownloader(const FullRemoteFileLocation &remote, const LocalFileLocation &local, int64 size, string name, - const FileEncryptionKey &encryption_key, bool is_small, bool search_file, + const FileEncryptionKey &encryption_key, bool is_small, bool search_file, int64 offset, unique_ptr callback); // Should just implement all parent pure virtual methods. @@ -56,6 +56,7 @@ class FileDownloader : public FileLoader { bool next_part_stop_ = false; bool is_small_; bool search_file_{false}; + int64 offset_; bool use_cdn_ = false; DcId cdn_dc_id_; @@ -84,7 +85,8 @@ class FileDownloader : public FileLoader { Result should_restart_part(Part part, NetQueryPtr &net_query) override TD_WARN_UNUSED_RESULT; Result> start_part(Part part, int32 part_count) override TD_WARN_UNUSED_RESULT; Result process_part(Part part, NetQueryPtr net_query) override TD_WARN_UNUSED_RESULT; - void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready, int64 ready_size) override; + void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, string ready_bitmask, bool is_ready, + int64 ready_size) override; FileLoader::Callback *get_callback() override; Status process_check_query(NetQueryPtr net_query) override; Result check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) override; diff --git a/td/telegram/files/FileGenerateManager.cpp b/td/telegram/files/FileGenerateManager.cpp index 4dd7f2a3..5ec07053 100644 --- a/td/telegram/files/FileGenerateManager.cpp +++ b/td/telegram/files/FileGenerateManager.cpp @@ -83,10 +83,11 @@ class FileDownloadGenerateActor : public FileGenerateActor { ActorId parent_; }; - send_closure(G()->file_manager(), &FileManager::download, file_id_, std::make_shared(actor_id(this)), 1); + send_closure(G()->file_manager(), &FileManager::download, file_id_, std::make_shared(actor_id(this)), 1, + -1); } void hangup() override { - send_closure(G()->file_manager(), &FileManager::download, file_id_, nullptr, 0); + send_closure(G()->file_manager(), &FileManager::download, file_id_, nullptr, 0, 0); stop(); } @@ -302,8 +303,9 @@ class FileExternalGenerateActor : public FileGenerateActor { if (local_prefix_size < 0) { return Status::Error(1, "Invalid local prefix size"); } - callback_->on_partial_generate( - PartialLocalFileLocation{generate_location_.file_type_, path_, 1, local_prefix_size, ""}, expected_size); + callback_->on_partial_generate(PartialLocalFileLocation{generate_location_.file_type_, path_, local_prefix_size, + Bitmask(Bitmask::Ones{}, 1).encode(), ""}, + expected_size); return Status::OK(); } diff --git a/td/telegram/files/FileLoadManager.cpp b/td/telegram/files/FileLoadManager.cpp index 612132d3..e1c0f724 100644 --- a/td/telegram/files/FileLoadManager.cpp +++ b/td/telegram/files/FileLoadManager.cpp @@ -39,7 +39,7 @@ ActorOwn &FileLoadManager::get_download_resource_manager(bool i void FileLoadManager::download(QueryId id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local, int64 size, string name, - const FileEncryptionKey &encryption_key, bool search_file, int8 priority) { + const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int8 priority) { if (stop_flag_) { return; } @@ -51,7 +51,7 @@ void FileLoadManager::download(QueryId id, const FullRemoteFileLocation &remote_ auto callback = make_unique(actor_shared(this, node_id)); bool is_small = size < 20 * 1024; node->loader_ = create_actor("Downloader", remote_location, local, size, std::move(name), - encryption_key, is_small, search_file, std::move(callback)); + encryption_key, is_small, search_file, offset, std::move(callback)); DcId dc_id = remote_location.is_web() ? G()->get_webfile_dc_id() : remote_location.get_dc_id(); auto &resource_manager = get_download_resource_manager(is_small, dc_id); send_closure(resource_manager, &ResourceManager::register_worker, @@ -156,6 +156,20 @@ void FileLoadManager::update_local_file_location(QueryId id, const LocalFileLoca } send_closure(node->loader_, &FileLoaderActor::update_local_file_location, local); } +void FileLoadManager::update_download_offset(QueryId id, int64 offset) { + if (stop_flag_) { + return; + } + auto it = query_id_to_node_id_.find(id); + if (it == query_id_to_node_id_.end()) { + return; + } + auto node = nodes_container_.get(it->second); + if (node == nullptr) { + return; + } + send_closure(node->loader_, &FileLoaderActor::update_download_offset, offset); +} void FileLoadManager::hangup() { nodes_container_.for_each([](auto id, auto &node) { node.loader_.reset(); }); stop_flag_ = true; diff --git a/td/telegram/files/FileLoadManager.h b/td/telegram/files/FileLoadManager.h index 7dc92d59..a7c416b6 100644 --- a/td/telegram/files/FileLoadManager.h +++ b/td/telegram/files/FileLoadManager.h @@ -46,7 +46,7 @@ class FileLoadManager final : public Actor { explicit FileLoadManager(ActorShared callback, ActorShared<> parent); void download(QueryId id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local, int64 size, - string name, const FileEncryptionKey &encryption_key, bool search_file, int8 priority); + string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int8 priority); void upload(QueryId id, const LocalFileLocation &local_location, const RemoteFileLocation &remote_location, int64 size, const FileEncryptionKey &encryption_key, int8 priority, vector bad_parts); void upload_by_hash(QueryId id, const FullLocalFileLocation &local_location, int64 size, int8 priority); @@ -54,6 +54,7 @@ class FileLoadManager final : public Actor { void from_bytes(QueryId id, FileType type, BufferSlice bytes, string name); void cancel(QueryId id); void update_local_file_location(QueryId id, const LocalFileLocation &local); + void update_download_offset(QueryId id, int64 offset); void get_content(const FullLocalFileLocation &local_location, Promise promise); private: diff --git a/td/telegram/files/FileLoader.cpp b/td/telegram/files/FileLoader.cpp index 666490eb..1f1a9dcd 100644 --- a/td/telegram/files/FileLoader.cpp +++ b/td/telegram/files/FileLoader.cpp @@ -63,6 +63,15 @@ void FileLoader::update_local_file_location(const LocalFileLocation &local) { loop(); } +void FileLoader::update_download_offset(int64 offset) { + parts_manager_.set_streaming_offset(offset); + //TODO: cancel only some queries + for (auto &it : part_map_) { + it.second.second.reset(); // cancel_query(it.second.second); + } + loop(); +} + void FileLoader::start_up() { auto r_file_info = init(); if (r_file_info.is_error()) { @@ -78,14 +87,15 @@ void FileLoader::start_up() { auto &ready_parts = file_info.ready_parts; auto use_part_count_limit = file_info.use_part_count_limit; auto status = parts_manager_.init(size, expected_size, is_size_final, part_size, ready_parts, use_part_count_limit); - if (file_info.only_check) { - parts_manager_.set_checked_prefix_size(0); - } if (status.is_error()) { on_error(std::move(status)); stop_flag_ = true; return; } + if (file_info.only_check) { + parts_manager_.set_checked_prefix_size(0); + } + parts_manager_.set_streaming_offset(file_info.offset); if (ordered_flag_) { ordered_parts_ = OrderedEventsProcessor>(parts_manager_.get_ready_prefix_count()); } @@ -289,6 +299,7 @@ Status FileLoader::try_on_part_query(Part part, NetQueryPtr query) { void FileLoader::on_progress_impl(size_t size) { on_progress(parts_manager_.get_part_count(), static_cast(parts_manager_.get_part_size()), - parts_manager_.get_ready_prefix_count(), parts_manager_.ready(), parts_manager_.get_ready_size()); + parts_manager_.get_ready_prefix_count(), parts_manager_.get_bitmask(), parts_manager_.ready(), + parts_manager_.get_ready_size()); } } // namespace td diff --git a/td/telegram/files/FileLoader.h b/td/telegram/files/FileLoader.h index 1da79773..a8f934ac 100644 --- a/td/telegram/files/FileLoader.h +++ b/td/telegram/files/FileLoader.h @@ -38,6 +38,7 @@ class FileLoader : public FileLoaderActor { void update_resources(const ResourceState &other) override; void update_local_file_location(const LocalFileLocation &local) override; + void update_download_offset(int64 offset) override; protected: void set_ordered_flag(bool flag); @@ -49,13 +50,14 @@ class FileLoader : public FileLoaderActor { }; struct FileInfo { int64 size; - int64 expected_size = 0; + int64 expected_size{0}; bool is_size_final; int32 part_size; std::vector ready_parts; bool use_part_count_limit = true; bool only_check = false; bool need_delay = false; + int64 offset{0}; }; virtual Result init() TD_WARN_UNUSED_RESULT = 0; virtual Status on_ok(int64 size) TD_WARN_UNUSED_RESULT = 0; @@ -67,8 +69,8 @@ class FileLoader : public FileLoaderActor { virtual void after_start_parts() { } virtual Result process_part(Part part, NetQueryPtr net_query) TD_WARN_UNUSED_RESULT = 0; - virtual void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready, - int64 ready_size) = 0; + virtual void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, std::string ready_bitmask, + bool is_ready, int64 ready_size) = 0; virtual Callback *get_callback() = 0; virtual Result on_update_local_location(const LocalFileLocation &location) TD_WARN_UNUSED_RESULT { return Status::Error("unsupported"); diff --git a/td/telegram/files/FileLoaderActor.h b/td/telegram/files/FileLoaderActor.h index 3e4646c7..27844564 100644 --- a/td/telegram/files/FileLoaderActor.h +++ b/td/telegram/files/FileLoaderActor.h @@ -22,9 +22,11 @@ class FileLoaderActor : public NetQueryCallback { virtual void update_priority(int8 priority) = 0; virtual void update_resources(const ResourceState &other) = 0; - // TODO: existence of this function is a dirty hack. Refactoring is highly appreciated + // TODO: existence of this two functions is a dirty hack. Refactoring is highly appreciated virtual void update_local_file_location(const LocalFileLocation &local) { } + virtual void update_download_offset(int64 offset) { + } }; } // namespace td diff --git a/td/telegram/files/FileLocation.h b/td/telegram/files/FileLocation.h index cb457741..a6221ae7 100644 --- a/td/telegram/files/FileLocation.h +++ b/td/telegram/files/FileLocation.h @@ -6,6 +6,7 @@ // #pragma once +#include "td/telegram/files/FileBitmask.h" #include "td/telegram/td_api.h" #include "td/telegram/telegram_api.h" @@ -909,8 +910,8 @@ struct PartialLocalFileLocation { FileType file_type_; string path_; int32 part_size_; - int32 ready_part_count_; string iv_; + string ready_bitmask_; template void store(StorerT &storer) const { @@ -918,8 +919,10 @@ struct PartialLocalFileLocation { store(file_type_, storer); store(path_, storer); store(part_size_, storer); - store(ready_part_count_, storer); + int32 deprecated_ready_part_count = -1; + store(deprecated_ready_part_count, storer); store(iv_, storer); + store(ready_bitmask_, storer); } template void parse(ParserT &parser) { @@ -930,14 +933,20 @@ struct PartialLocalFileLocation { } parse(path_, parser); parse(part_size_, parser); - parse(ready_part_count_, parser); + int32 deprecated_ready_part_count; + parse(deprecated_ready_part_count, parser); parse(iv_, parser); + if (deprecated_ready_part_count == -1) { + parse(ready_bitmask_, parser); + } else { + ready_bitmask_ = Bitmask(Bitmask::Ones{}, deprecated_ready_part_count).encode(); + } } }; inline bool operator==(const PartialLocalFileLocation &lhs, const PartialLocalFileLocation &rhs) { return lhs.file_type_ == rhs.file_type_ && lhs.path_ == rhs.path_ && lhs.part_size_ == rhs.part_size_ && - lhs.ready_part_count_ == rhs.ready_part_count_ && lhs.iv_ == rhs.iv_; + lhs.iv_ == rhs.iv_ && lhs.ready_bitmask_ == rhs.ready_bitmask_; } inline bool operator!=(const PartialLocalFileLocation &lhs, const PartialLocalFileLocation &rhs) { diff --git a/td/telegram/files/FileManager.cpp b/td/telegram/files/FileManager.cpp index e2a88d46..a3f9ad30 100644 --- a/td/telegram/files/FileManager.cpp +++ b/td/telegram/files/FileManager.cpp @@ -33,6 +33,9 @@ #include namespace td { +namespace { +constexpr int64 MAX_FILE_SIZE = 1500 * (1 << 20) /* 1500MB */; +} int VERBOSITY_NAME(update_file) = VERBOSITY_NAME(DEBUG); @@ -63,7 +66,48 @@ FileNodePtr::operator bool() const { return file_manager_ != nullptr && get_unsafe() != nullptr; } -void FileNode::set_local_location(const LocalFileLocation &local, int64 ready_size) { +void FileNode::recalc_ready_prefix_size(Bitmask::ReadySize ready_prefix_size) { + if (local_.type() != LocalFileLocation::Type::Partial) { + return; + } + int64 new_local_ready_prefix_size; + if (download_offset_ == ready_prefix_size.offset) { + new_local_ready_prefix_size = ready_prefix_size.ready_size; + } else { + new_local_ready_prefix_size = Bitmask(Bitmask::Decode{}, local_.partial().ready_bitmask_) + .get_ready_size(download_offset_, local_.partial().part_size_) + .ready_size; + } + if (new_local_ready_prefix_size != local_ready_prefix_size_) { + local_ready_prefix_size_ = new_local_ready_prefix_size; + on_info_changed(); + } +} + +void FileNode::init_ready_size() { + if (local_.type() != LocalFileLocation::Type::Partial) { + return; + } + auto bitmask = Bitmask(Bitmask::Decode{}, local_.partial().ready_bitmask_); + local_ready_prefix_size_ = bitmask.get_ready_size(0, local_.partial().part_size_).ready_size; + local_ready_size_ = bitmask.get_total_size(local_.partial().part_size_); +} + +void FileNode::set_download_offset(int64 download_offset) { + if (download_offset < 0 || download_offset > MAX_FILE_SIZE) { + return; + } + if (download_offset == download_offset_) { + return; + } + download_offset_ = download_offset; + is_download_offset_dirty_ = true; + recalc_ready_prefix_size({}); + on_info_changed(); +} + +void FileNode::set_local_location(const LocalFileLocation &local, int64 ready_size, + Bitmask::ReadySize ready_prefix_size) { if (local_ready_size_ != ready_size) { local_ready_size_ = ready_size; VLOG(update_file) << "File " << main_file_id_ << " has changed local ready size"; @@ -72,6 +116,9 @@ void FileNode::set_local_location(const LocalFileLocation &local, int64 ready_si if (local_ != local) { VLOG(update_file) << "File " << main_file_id_ << " has changed local location"; local_ = local; + + recalc_ready_prefix_size(ready_prefix_size); + on_changed(); } } @@ -297,6 +344,28 @@ bool FileView::is_downloading() const { return node_->download_priority_ != 0 || node_->generate_download_priority_ != 0; } +int64 FileView::download_offset() const { + return node_->download_offset_; +} +int64 FileView::downloaded_prefix(int64 offset) const { + switch (node_->local_.type()) { + case LocalFileLocation::Type::Empty: + return 0; + case LocalFileLocation::Type::Full: + if (offset < node_->size_) { + return node_->size_ - offset; + } + return 0; + case LocalFileLocation::Type::Partial: + return Bitmask(Bitmask::Decode{}, node_->local_.partial().ready_bitmask_) + .get_ready_size(offset, node_->local_.partial().part_size_) + .ready_size; + default: + UNREACHABLE(); + return 0; + } +} + int64 FileView::local_size() const { switch (node_->local_.type()) { case LocalFileLocation::Type::Full: @@ -306,7 +375,7 @@ int64 FileView::local_size() const { // File is not decrypted yet return 0; } - return node_->local_.partial().part_size_ * node_->local_.partial().ready_part_count_; + return node_->local_ready_prefix_size_; } default: return 0; @@ -319,8 +388,7 @@ int64 FileView::local_total_size() const { case LocalFileLocation::Type::Full: return node_->size_; case LocalFileLocation::Type::Partial: - return max(static_cast(node_->local_.partial().part_size_) * node_->local_.partial().ready_part_count_, - node_->local_ready_size_); + return max(node_->local_ready_prefix_size_, node_->local_ready_size_); default: UNREACHABLE(); return 0; @@ -528,7 +596,6 @@ string FileManager::get_file_name(FileType file_type, Slice path) { Status FileManager::check_local_location(FullLocalFileLocation &location, int64 &size) { constexpr int64 MAX_THUMBNAIL_SIZE = 200 * (1 << 10) /* 200KB */; - constexpr int64 MAX_FILE_SIZE = 1500 * (1 << 20) /* 1500MB */; if (location.path_.empty()) { return Status::Error("File must have non-empty path"); @@ -1051,9 +1118,11 @@ Result FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy node->download_id_ = other_node->download_id_; node->is_download_started_ |= other_node->is_download_started_; node->set_download_priority(other_node->download_priority_); + node->set_download_offset(other_node->download_offset_); other_node->download_id_ = 0; other_node->is_download_started_ = false; other_node->download_priority_ = 0; + other_node->download_offset_ = 0; //cancel_generate(node); //node->set_generate_location(std::move(other_node->generate_)); @@ -1449,7 +1518,8 @@ void FileManager::delete_file(FileId file_id, Promise promise, const char promise.set_value(Unit()); } -void FileManager::download(FileId file_id, std::shared_ptr callback, int32 new_priority) { +void FileManager::download(FileId file_id, std::shared_ptr callback, int32 new_priority, + int64 offset) { LOG(INFO) << "Download file " << file_id << " with priority " << new_priority; auto node = get_sync_file_node(file_id); if (!node) { @@ -1496,7 +1566,7 @@ void FileManager::download(FileId file_id, std::shared_ptr cal } LOG(INFO) << "Change download priority of file " << file_id << " to " << new_priority; - + node->set_download_offset(offset); auto *file_info = get_file_id_info(file_id); CHECK(new_priority == 0 || callback); file_info->download_priority_ = narrow_cast(new_priority); @@ -1509,6 +1579,21 @@ void FileManager::download(FileId file_id, std::shared_ptr cal try_flush_node(node); } +void FileManager::download_set_offset(FileId file_id, int64 offset) { + auto file_node = get_sync_file_node(file_id); + if (!file_node) { + LOG(INFO) << "File " << file_id << " not found"; + return; + } + if (FileView(file_node).is_encrypted()) { + offset = 0; + } + file_node->set_download_offset(offset); + run_generate(file_node); + run_download(file_node); + try_flush_node(file_node); +} + void FileManager::run_download(FileNodePtr node) { if (node->need_load_from_pmc_) { return; @@ -1537,10 +1622,19 @@ void FileManager::run_download(FileNodePtr node) { } return; } + if (file_view.is_encrypted()) { + node->set_download_offset(0); + } + auto offset = node->download_offset_; + bool need_update_offset = node->is_download_offset_dirty_; + node->is_download_offset_dirty_ = false; if (old_priority != 0) { CHECK(node->download_id_ != 0); send_closure(file_load_manager_, &FileLoadManager::update_priority, node->download_id_, priority); + if (need_update_offset) { + send_closure(file_load_manager_, &FileLoadManager::update_download_offset, node->download_id_, offset); + } return; } @@ -1553,7 +1647,8 @@ void FileManager::run_download(FileNodePtr node) { LOG(DEBUG) << "Run download of file " << file_id << " of size " << node->size_ << " from " << node->remote_.full() << " with suggested name " << node->suggested_name() << " and encyption key " << node->encryption_key_; send_closure(file_load_manager_, &FileLoadManager::download, id, node->remote_.full(), node->local_, node->size_, - node->suggested_name(), node->encryption_key_, node->can_search_locally_, priority); + node->suggested_name(), node->encryption_key_, node->can_search_locally_, node->download_offset_, + priority); } void FileManager::resume_upload(FileId file_id, std::vector bad_parts, std::shared_ptr callback, @@ -1811,37 +1906,6 @@ void FileManager::upload(FileId file_id, std::shared_ptr callbac return resume_upload(file_id, std::vector(), std::move(callback), new_priority, upload_order); } -// is't quite stupid, yep -// 0x00 -static string zero_decode(Slice s) { - string res; - for (size_t n = s.size(), i = 0; i < n; i++) { - if (i + 1 < n && s[i] == 0) { - res.append(static_cast(s[i + 1]), 0); - i++; - continue; - } - res.push_back(s[i]); - } - return res; -} - -static string zero_encode(Slice s) { - string res; - for (size_t n = s.size(), i = 0; i < n; i++) { - res.push_back(s[i]); - if (s[i] == 0) { - unsigned char cnt = 1; - while (cnt < 250 && i + cnt < n && s[i + cnt] == 0) { - cnt++; - } - res.push_back(cnt); - i += cnt - 1; - } - } - return res; -} - static bool is_document_type(FileType type) { return type == FileType::Document || type == FileType::Sticker || type == FileType::Audio || type == FileType::Animation; @@ -1965,6 +2029,7 @@ tl_object_ptr FileManager::get_file_object(FileId file_id, bool wi int32 size = narrow_cast(file_view.size()); int32 expected_size = narrow_cast(file_view.expected_size()); + int32 download_offset = narrow_cast(file_view.download_offset()); int32 local_size = narrow_cast(file_view.local_size()); int32 local_total_size = narrow_cast(file_view.local_total_size()); int32 remote_size = narrow_cast(file_view.remote_size()); @@ -1987,8 +2052,8 @@ tl_object_ptr FileManager::get_file_object(FileId file_id, bool wi return td_api::make_object( result_file_id.get(), size, expected_size, td_api::make_object(std::move(path), can_be_downloaded, can_be_deleted, - file_view.is_downloading(), file_view.has_local_location(), local_size, - local_total_size), + file_view.is_downloading(), file_view.has_local_location(), + download_offset, local_size, local_total_size), td_api::make_object(std::move(persistent_file_id), file_view.is_uploading(), is_uploading_completed, remote_size)); } @@ -2208,6 +2273,8 @@ void FileManager::on_partial_download(QueryId query_id, const PartialLocalFileLo auto file_id = query->file_id_; auto file_node = get_file_node(file_id); LOG(DEBUG) << "Receive on_parial_download for file " << file_id; + LOG(ERROR) << "Receive on_parital_generate for file " << file_id << ": " << partial_local.path_ << " " + << Bitmask(Bitmask::Decode{}, partial_local.ready_bitmask_); if (!file_node) { return; } @@ -2379,7 +2446,7 @@ void FileManager::on_partial_generate(QueryId query_id, const PartialLocalFileLo auto file_id = query->file_id_; auto file_node = get_file_node(file_id); LOG(DEBUG) << "Receive on_parital_generate for file " << file_id << ": " << partial_local.path_ << " " - << partial_local.ready_part_count_; + << Bitmask(Bitmask::Decode{}, partial_local.ready_bitmask_); if (!file_node) { return; } diff --git a/td/telegram/files/FileManager.h b/td/telegram/files/FileManager.h index 2fcdf31c..f508d6c7 100644 --- a/td/telegram/files/FileManager.h +++ b/td/telegram/files/FileManager.h @@ -56,8 +56,9 @@ class FileNode { , encryption_key_(std::move(key)) , main_file_id_(main_file_id) , main_file_id_priority_(main_file_id_priority) { + init_ready_size(); } - void set_local_location(const LocalFileLocation &local, int64 ready_size); + void set_local_location(const LocalFileLocation &local, int64 ready_size, Bitmask::ReadySize ready_prefix_size = {}); void set_remote_location(const RemoteFileLocation &remote, FileLocationSource source, int64 ready_size); void set_generate_location(unique_ptr &&generate); void set_size(int64 size); @@ -71,6 +72,9 @@ class FileNode { void set_upload_priority(int8 priority); void set_generate_priority(int8 download_priority, int8 upload_priority); + void set_download_offset(int64 download_offset); + void recalc_ready_prefix_size(Bitmask::ReadySize ready_prefix_size); + void on_changed(); void on_info_changed(); void on_pmc_changed(); @@ -90,6 +94,8 @@ class FileNode { LocalFileLocation local_; FileLoadManager::QueryId upload_id_ = 0; int64 local_ready_size_ = 0; + int64 download_offset_ = 0; + int64 local_ready_prefix_size_ = 0; RemoteFileLocation remote_; FileLoadManager::QueryId download_id_ = 0; @@ -118,6 +124,7 @@ class FileNode { int8 generate_upload_priority_ = 0; int8 main_file_id_priority_ = 0; + bool is_download_offset_dirty_ = false; FileLocationSource remote_source_ = FileLocationSource::FromUser; @@ -131,6 +138,9 @@ class FileNode { bool pmc_changed_flag_{false}; bool info_changed_flag_{false}; + bool offset_changed_flags_{false}; + + void init_ready_size(); }; class FileManager; @@ -209,6 +219,8 @@ class FileView { int64 size() const; int64 expected_size() const; bool is_downloading() const; + int64 download_offset() const; + int64 downloaded_prefix(int64 offset) const; int64 local_size() const; int64 local_total_size() const; bool is_uploading() const; @@ -327,7 +339,8 @@ class FileManager : public FileLoadManager::Callback { bool set_encryption_key(FileId file_id, FileEncryptionKey key); bool set_content(FileId file_id, BufferSlice bytes); - void download(FileId file_id, std::shared_ptr callback, int32 new_priority); + void download(FileId file_id, std::shared_ptr callback, int32 new_priority, int64 offset); + void download_set_offset(FileId file_id, int64 offset); void upload(FileId file_id, std::shared_ptr callback, int32 new_priority, uint64 upload_order); void resume_upload(FileId file_id, std::vector bad_parts, std::shared_ptr callback, int32 new_priority, uint64 upload_order); diff --git a/td/telegram/files/FileUploader.cpp b/td/telegram/files/FileUploader.cpp index 20fa13f4..9acb05e9 100644 --- a/td/telegram/files/FileUploader.cpp +++ b/td/telegram/files/FileUploader.cpp @@ -107,7 +107,9 @@ Result FileUploader::on_update_local_location(const Loca file_type = FileType::Temp; } else if (location.type() == LocalFileLocation::Type::Partial) { path = location.partial().path_; - local_size = static_cast(location.partial().part_size_) * location.partial().ready_part_count_; + local_size = Bitmask(Bitmask::Decode{}, location.partial().ready_bitmask_) + .get_ready_size(0, location.partial().part_size_) + .ready_size; local_is_ready = false; file_type = location.partial().file_type_; } else { @@ -292,8 +294,8 @@ Result FileUploader::process_part(Part part, NetQueryPtr net_query) { return part.size; } -void FileUploader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready, - int64 ready_size) { +void FileUploader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, std::string bitmask, + bool is_ready, int64 ready_size) { callback_->on_partial_upload(PartialRemoteFileLocation{file_id_, part_count, part_size, ready_part_count, big_flag_}, ready_size); if (is_ready) { diff --git a/td/telegram/files/FileUploader.h b/td/telegram/files/FileUploader.h index 567d2c88..9d846c4a 100644 --- a/td/telegram/files/FileUploader.h +++ b/td/telegram/files/FileUploader.h @@ -62,7 +62,8 @@ class FileUploader : public FileLoader { void after_start_parts() override; Result> start_part(Part part, int32 part_count) override TD_WARN_UNUSED_RESULT; Result process_part(Part part, NetQueryPtr net_query) override TD_WARN_UNUSED_RESULT; - void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready, int64 ready_size) override; + void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, string ready_bitmask, bool is_ready, + int64 ready_size) override; FileLoader::Callback *get_callback() override; Result on_update_local_location(const LocalFileLocation &location) override TD_WARN_UNUSED_RESULT; diff --git a/td/telegram/files/PartsManager.cpp b/td/telegram/files/PartsManager.cpp index ebd72bc7..74500a74 100644 --- a/td/telegram/files/PartsManager.cpp +++ b/td/telegram/files/PartsManager.cpp @@ -29,6 +29,27 @@ Status PartsManager::init_known_prefix(int64 known_prefix, size_t part_size, con return init_no_size(part_size, ready_parts); } +void PartsManager::set_streaming_offset(int64 offset) { + if (need_check_ || (!unknown_size_flag_ && get_size() < offset)) { + streaming_offset_ = 0; + return; + } + + auto part_i = offset / part_size_; + if (part_i > MAX_PART_COUNT || part_i < 0) { + streaming_offset_ = 0; + // error? + return; + } + + streaming_offset_ = offset; + first_streaming_empty_part_ = narrow_cast(part_i); + if (part_count_ <= first_streaming_empty_part_) { + part_count_ = first_streaming_empty_part_ + 1; + part_status_.resize(part_count_, PartStatus::Empty); + } +} + Status PartsManager::init_no_size(size_t part_size, const std::vector &ready_parts) { unknown_size_flag_ = true; size_ = 0; @@ -121,6 +142,14 @@ void PartsManager::update_first_empty_part() { while (first_empty_part_ < part_count_ && part_status_[first_empty_part_] != PartStatus::Empty) { first_empty_part_++; } + + if (streaming_offset_ == 0) { + first_streaming_empty_part_ = first_empty_part_; + return; + } + while (first_streaming_empty_part_ < part_count_ && part_status_[first_streaming_empty_part_] != PartStatus::Empty) { + first_streaming_empty_part_++; + } } void PartsManager::update_first_not_ready_part() { @@ -144,10 +173,14 @@ int32 PartsManager::get_ready_prefix_count() { } return res; } +string PartsManager::get_bitmask() const { + return bitmask_.encode(); +} Result PartsManager::start_part() { update_first_empty_part(); - if (first_empty_part_ == part_count_) { + auto part_i = first_streaming_empty_part_; + if (part_i == part_count_) { if (unknown_size_flag_) { if (known_prefix_flag_ == false) { part_count_++; @@ -159,13 +192,16 @@ Result PartsManager::start_part() { return Status::Error(1, "Wait for prefix to be known"); } } else { - return get_empty_part(); + if (first_empty_part_ < part_count_) { + part_i = first_empty_part_; + } else { + return get_empty_part(); + } } } - CHECK(part_status_[first_empty_part_] == PartStatus::Empty); - int id = first_empty_part_; - on_part_start(id); - return get_part(id); + CHECK(part_status_[part_i] == PartStatus::Empty); + on_part_start(part_i); + return get_part(part_i); } Status PartsManager::set_known_prefix(size_t size, bool is_ready) { @@ -200,6 +236,7 @@ Status PartsManager::on_part_ok(int32 id, size_t part_size, size_t actual_size) pending_count_--; part_status_[id] = PartStatus::Ready; + bitmask_.set(id); ready_size_ += narrow_cast(actual_size); VLOG(files) << "Transferred part " << id << " of size " << part_size << ", total ready size = " << ready_size_; @@ -241,6 +278,14 @@ void PartsManager::on_part_failed(int32 id) { if (id < first_empty_part_) { first_empty_part_ = id; } + if (streaming_offset_ == 0) { + first_streaming_empty_part_ = id; + return; + } + auto part_i = narrow_cast(streaming_offset_ / part_size_); + if (id >= part_i && id < first_streaming_empty_part_) { + first_streaming_empty_part_ = id; + } } int64 PartsManager::get_size() const { @@ -280,6 +325,7 @@ void PartsManager::init_common(const std::vector &ready_parts) { for (auto i : ready_parts) { CHECK(0 <= i && i < part_count_) << tag("i", i) << tag("part_count", part_count_); part_status_[i] = PartStatus::Ready; + bitmask_.set(i); auto part = get_part(i); ready_size_ += narrow_cast(part.size); } @@ -289,6 +335,7 @@ void PartsManager::init_common(const std::vector &ready_parts) { void PartsManager::set_need_check() { need_check_ = true; + set_streaming_offset(0); } void PartsManager::set_checked_prefix_size(int64 size) { diff --git a/td/telegram/files/PartsManager.h b/td/telegram/files/PartsManager.h index 5a4bbf3f..22a1b2a9 100644 --- a/td/telegram/files/PartsManager.h +++ b/td/telegram/files/PartsManager.h @@ -9,6 +9,8 @@ #include "td/utils/common.h" #include "td/utils/Status.h" +#include "td/telegram/files/FileBitmask.h" + namespace td { /*** PartsManager***/ @@ -33,6 +35,7 @@ class PartsManager { Status set_known_prefix(size_t size, bool is_ready); void set_need_check(); void set_checked_prefix_size(int64 size); + void set_streaming_offset(int64 offset); int64 get_checked_prefix_size() const; int64 get_unchecked_ready_prefix_size(); @@ -44,6 +47,7 @@ class PartsManager { int32 get_part_count() const; int32 get_unchecked_ready_prefix_count(); int32 get_ready_prefix_count(); + string get_bitmask() const; private: static constexpr int MAX_PART_COUNT = 3000; @@ -70,7 +74,10 @@ class PartsManager { int pending_count_; int first_empty_part_; int first_not_ready_part_; + int64 streaming_offset_{0}; + int first_streaming_empty_part_; vector part_status_; + Bitmask bitmask_; bool use_part_count_limit_; void init_common(const vector &ready_parts); diff --git a/tdutils/td/utils/misc.cpp b/tdutils/td/utils/misc.cpp index a6816f79..95970429 100644 --- a/tdutils/td/utils/misc.cpp +++ b/tdutils/td/utils/misc.cpp @@ -125,5 +125,55 @@ string url_encode(Slice str) { CHECK(result.size() == length); return result; } +namespace detail { +template +string x_decode(Slice s, F &&f) { + string res; + for (size_t n = s.size(), i = 0; i < n; i++) { + if (i + 1 < n && f(s[i])) { + res.append(static_cast(s[i + 1]), s[i]); + i++; + continue; + } + res.push_back(s[i]); + } + return res; +} +template +string x_encode(Slice s, F &&f) { + string res; + for (size_t n = s.size(), i = 0; i < n; i++) { + res.push_back(s[i]); + if (f(s[i])) { + unsigned char cnt = 1; + while (cnt < 250 && i + cnt < n && s[i + cnt] == s[i]) { + cnt++; + } + res.push_back(cnt); + i += cnt - 1; + } + } + return res; +} +bool is_zero(uint8 c) { + return c == 0; +} +bool is_zero_or_one(uint8 c) { + return c == 0 || c == 0xff; +} +} // namespace detail + +std::string zero_encode(Slice data) { + return detail::x_encode(data, detail::is_zero); +} +std::string zero_decode(Slice data) { + return detail::x_decode(data, detail::is_zero); +} +std::string zero_one_encode(Slice data) { + return detail::x_encode(data, detail::is_zero_or_one); +} +std::string zero_one_decode(Slice data) { + return detail::x_decode(data, detail::is_zero_or_one); +} } // namespace td diff --git a/tdutils/td/utils/misc.h b/tdutils/td/utils/misc.h index d4a0013c..dbc1fc8b 100644 --- a/tdutils/td/utils/misc.h +++ b/tdutils/td/utils/misc.h @@ -380,4 +380,8 @@ detail::reversion_wrapper reversed(T &iterable) { return {iterable}; } +std::string zero_encode(Slice data); +std::string zero_decode(Slice data); +std::string zero_one_encode(Slice data); +std::string zero_one_decode(Slice data); } // namespace td diff --git a/test/tdclient.cpp b/test/tdclient.cpp index edd7eed4..e4d7b51f 100644 --- a/test/tdclient.cpp +++ b/test/tdclient.cpp @@ -612,7 +612,7 @@ class CheckTestC : public Task { if (text.substr(0, tag_.size()) == tag_) { file_id_to_check_ = messageDocument->document_->document_->id_; LOG(ERROR) << "GOT FILE " << to_string(messageDocument->document_->document_); - this->send_query(make_tl_object(file_id_to_check_, 1), + this->send_query(make_tl_object(file_id_to_check_, 1, 0), [](auto res) { check_td_error(res); }); } }