Files streaming

GitOrigin-RevId: 78df1cd44c95380cd4af46f4db809ce28876db1f
This commit is contained in:
Arseny Smirnov 2018-11-11 15:38:04 +04:00
parent 7ee12fd9a2
commit 3b238f6fba
28 changed files with 526 additions and 105 deletions

View File

@ -358,6 +358,7 @@ set(TDLIB_SOURCE
td/telegram/DialogParticipant.cpp
td/telegram/DocumentsManager.cpp
td/telegram/DraftMessage.cpp
td/telegram/files/FileBitmask.cpp
td/telegram/files/FileDb.cpp
td/telegram/files/FileDownloader.cpp
td/telegram/files/FileFromBytes.cpp
@ -478,6 +479,7 @@ set(TDLIB_SOURCE
td/telegram/DialogParticipant.h
td/telegram/DocumentsManager.h
td/telegram/DraftMessage.h
td/telegram/files/FileBitmask.h
td/telegram/files/FileDb.h
td/telegram/files/FileDownloader.h
td/telegram/files/FileFromBytes.h

View File

@ -126,9 +126,10 @@ temporaryPasswordState has_password:Bool valid_for:int32 = TemporaryPasswordStat
//@can_be_deleted True, if the file can be deleted
//@is_downloading_active True, if the file is currently being downloaded (or a local copy is being generated by some other means)
//@is_downloading_completed True, if the local copy is fully available
//@download_offset Download will be started from this offset. downloaded_prefix_size is calculated from this offset.
//@downloaded_prefix_size If is_downloading_completed is false, then only some prefix of the file is ready to be read. downloaded_prefix_size is the size of that prefix
//@downloaded_size Total downloaded file bytes. Should be used only for calculating download progress. The actual file size may be bigger, and some parts of it may contain garbage
localFile path:string can_be_downloaded:Bool can_be_deleted:Bool is_downloading_active:Bool is_downloading_completed:Bool downloaded_prefix_size:int32 downloaded_size:int32 = LocalFile;
localFile path:string can_be_downloaded:Bool can_be_deleted:Bool is_downloading_active:Bool is_downloading_completed:Bool download_offset:int32 downloaded_prefix_size:int32 downloaded_size:int32 = LocalFile;
//@description Represents a remote file
//@id Remote file identifier; may be empty. Can be used across application restarts or even from other devices for the current user. If the ID starts with "http://" or "https://", it represents the HTTP URL of the file. TDLib is currently unable to download files if only their URL is known.
@ -2948,7 +2949,18 @@ setPinnedChats chat_ids:vector<int53> = Ok;
//@description Asynchronously downloads a file from the cloud. updateFile will be used to notify about the download progress and successful completion of the download. Returns file state just after the download has been started
//@file_id Identifier of the file to download
//@priority Priority of the download (1-32). The higher the priority, the earlier the file will be downloaded. If the priorities of two files are equal, then the last one for which downloadFile was called will be downloaded first
downloadFile file_id:int32 priority:int32 = File;
//@offset File will be downloaded starting from offset first. Supposed to be used for streaming.
downloadFile file_id:int32 priority:int32 offset:int32 = File;
//@description Set offset for file downloading
//@file_id Identifier of file
//@offset File download offset
setFileDownloadOffset file_id:int32 offset:int32 = File;
//@description Get downloaded prefix from a given offset
//@file_id Identifier of file
//@offset Offset from which downloaded prefix is calculated
getFileDownloadedPrefix file_id:int32 offset:int32 = Count;
//@description Stops the downloading of a file. If a file has already been downloaded, does nothing @file_id Identifier of a file to stop downloading @only_if_pending Pass true to stop downloading only if it hasn't been started, i.e. request hasn't been sent to server
cancelDownloadFile file_id:int32 only_if_pending:Bool = Ok;

Binary file not shown.

View File

@ -822,9 +822,9 @@ static tl_object_ptr<T> copy(const tl_object_ptr<T> &obj) {
template <>
td_api::object_ptr<td_api::localFile> copy(const td_api::localFile &obj) {
return td_api::make_object<td_api::localFile>(obj.path_, obj.can_be_downloaded_, obj.can_be_deleted_,
obj.is_downloading_active_, obj.is_downloading_completed_,
obj.downloaded_prefix_size_, obj.downloaded_size_);
return td_api::make_object<td_api::localFile>(
obj.path_, obj.can_be_downloaded_, obj.can_be_deleted_, obj.is_downloading_active_, obj.is_downloading_completed_,
obj.download_offset_, obj.downloaded_prefix_size_, obj.downloaded_size_);
}
template <>
td_api::object_ptr<td_api::remoteFile> copy(const td_api::remoteFile &obj) {

View File

@ -6224,7 +6224,7 @@ void MessagesManager::load_secret_thumbnail(FileId thumbnail_file_id) {
});
send_closure(G()->file_manager(), &FileManager::download, thumbnail_file_id,
std::make_shared<Callback>(std::move(download_promise)), 1);
std::make_shared<Callback>(std::move(download_promise)), 1, -1);
}
void MessagesManager::on_upload_media(FileId file_id, tl_object_ptr<telegram_api::InputFile> input_file,

View File

@ -4858,6 +4858,15 @@ void Td::on_request(uint64 id, const td_api::getFile &request) {
send_closure(actor_id(this), &Td::send_result, id, file_manager_->get_file_object(FileId(request.file_id_, 0)));
}
void Td::on_request(uint64 id, const td_api::getFileDownloadedPrefix &request) {
auto file_view = file_manager_->get_file_view(FileId(request.file_id_, 0));
if (file_view.empty()) {
return send_closure(actor_id(this), &Td::send_error, id, Status::Error(10, "Unknown file id"));
}
send_closure(actor_id(this), &Td::send_result, id,
td_api::make_object<td_api::count>(static_cast<int32>(file_view.downloaded_prefix(request.offset_))));
}
void Td::on_request(uint64 id, td_api::getRemoteFile &request) {
CLEAN_INPUT_STRING(request.remote_file_id_);
auto r_file_id = file_manager_->from_persistent_id(
@ -5668,7 +5677,7 @@ void Td::on_request(uint64 id, const td_api::downloadFile &request) {
if (!(1 <= priority && priority <= 32)) {
return send_error_raw(id, 5, "Download priority must be in [1;32] range");
}
file_manager_->download(FileId(request.file_id_, 0), download_file_callback_, priority);
file_manager_->download(FileId(request.file_id_, 0), download_file_callback_, priority, request.offset_);
auto file = file_manager_->get_file_object(FileId(request.file_id_, 0), false);
if (file->id_ == 0) {
@ -5678,8 +5687,18 @@ void Td::on_request(uint64 id, const td_api::downloadFile &request) {
send_closure(actor_id(this), &Td::send_result, id, std::move(file));
}
void Td::on_request(uint64 id, const td_api::setFileDownloadOffset &request) {
file_manager_->download_set_offset(FileId(request.file_id_, 0), request.offset_);
auto file = file_manager_->get_file_object(FileId(request.file_id_, 0), false);
if (file->id_ == 0) {
return send_error_raw(id, 400, "Invalid file id");
}
send_closure(actor_id(this), &Td::send_result, id, std::move(file));
}
void Td::on_request(uint64 id, const td_api::cancelDownloadFile &request) {
file_manager_->download(FileId(request.file_id_, 0), nullptr, request.only_if_pending_ ? -1 : 0);
file_manager_->download(FileId(request.file_id_, 0), nullptr, 0, request.only_if_pending_ ? -1 : 0);
send_closure(actor_id(this), &Td::send_result, id, make_tl_object<td_api::ok>());
}

View File

@ -442,6 +442,8 @@ class Td final : public NetQueryCallback {
void on_request(uint64 id, const td_api::getFile &request);
void on_request(uint64 id, const td_api::getFileDownloadedPrefix &request);
void on_request(uint64 id, td_api::getRemoteFile &request);
void on_request(uint64 id, td_api::getStorageStatistics &request);
@ -644,6 +646,8 @@ class Td final : public NetQueryCallback {
void on_request(uint64 id, const td_api::downloadFile &request);
void on_request(uint64 id, const td_api::setFileDownloadOffset &request);
void on_request(uint64 id, const td_api::cancelDownloadFile &request);
void on_request(uint64 id, td_api::uploadFile &request);

View File

@ -2251,6 +2251,11 @@ class CliClient final : public Actor {
send_request(make_tl_object<td_api::getChatMessageByDate>(as_chat_id(chat_id), to_integer<int32>(date)));
} else if (op == "gf" || op == "GetFile") {
send_request(make_tl_object<td_api::getFile>(as_file_id(args)));
} else if (op == "gfp" || op == "GetFileDownloadedPrefix") {
string file_id;
string offset;
std::tie(file_id, offset) = split(args);
send_request(make_tl_object<td_api::getFileDownloadedPrefix>(as_file_id(file_id), to_integer<int32>(offset)));
} else if (op == "grf") {
send_request(make_tl_object<td_api::getRemoteFile>(args, nullptr));
} else if (op == "gmtf") {
@ -2271,26 +2276,38 @@ class CliClient final : public Actor {
send_request(make_tl_object<td_api::getMapThumbnailFile>(
as_location(latitude, longitude), to_integer<int32>(zoom), to_integer<int32>(width),
to_integer<int32>(height), to_integer<int32>(scale), as_chat_id(chat_id)));
} else if (op == "sfdo" || op == "SetDownloadFileOffset") {
string file_id_str;
string offset;
std::tie(file_id_str, offset) = split(args);
auto file_id = as_file_id(file_id_str);
send_request(make_tl_object<td_api::setFileDownloadOffset>(file_id, to_integer<int32>(offset)));
} else if (op == "df" || op == "DownloadFile") {
string file_id_str;
string priority;
std::tie(file_id_str, priority) = split(args);
string offset;
std::tie(file_id_str, args) = split(args);
std::tie(priority, offset) = split(args);
if (priority.empty()) {
priority = "1";
}
auto file_id = as_file_id(file_id_str);
send_request(make_tl_object<td_api::downloadFile>(file_id, to_integer<int32>(priority)));
send_request(
make_tl_object<td_api::downloadFile>(file_id, to_integer<int32>(priority), to_integer<int32>(offset)));
} else if (op == "dff") {
string file_id;
string priority;
std::tie(file_id, priority) = split(args);
string offset;
std::tie(file_id, args) = split(args);
std::tie(priority, offset) = split(args);
if (priority.empty()) {
priority = "1";
}
for (int i = 1; i <= as_file_id(file_id); i++) {
send_request(make_tl_object<td_api::downloadFile>(i, to_integer<int32>(priority)));
send_request(make_tl_object<td_api::downloadFile>(i, to_integer<int32>(priority), to_integer<int32>(offset)));
}
} else if (op == "cdf") {
send_request(make_tl_object<td_api::cancelDownloadFile>(as_file_id(args), false));

View File

@ -0,0 +1,84 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/files/FileBitmask.h"
#include "td/utils/misc.h"
namespace td {
Bitmask::Bitmask(Decode, Slice data) : data_(zero_one_decode(data)) {
}
Bitmask::Bitmask(Ones, int64 count) : data_((count + 7) / 8, '\0') {
for (int64 i = 0; i < count; i++) {
set(i);
}
}
std::string Bitmask::encode() const {
// remove zeroes in the end to make encoding deteministic
td::Slice data(data_);
while (!data.empty() && data.back() == 0) {
data.remove_suffix(1);
}
return zero_one_encode(data_);
}
Bitmask::ReadySize Bitmask::get_ready_size(int64 offset, int64 part_size) const {
ReadySize res;
res.offset = offset;
auto offset_part = offset / part_size;
auto ones = get_ready_parts(offset_part);
if (ones == 0) {
res.ready_size = 0;
} else {
res.ready_size = (offset_part + ones) * part_size - offset;
}
CHECK(res.ready_size >= 0);
return res;
}
int64 Bitmask::get_total_size(int64 part_size) const {
int64 res = 0;
for (int64 i = 0; i < size(); i++) {
res += get(i);
}
return res * part_size;
}
bool Bitmask::get(int64 offset) const {
if (offset < 0) {
return 0;
}
if (offset / 8 >= narrow_cast<int64>(data_.size())) {
return 0;
}
return (data_[offset / 8] & (1 << (offset % 8))) != 0;
}
int64 Bitmask::get_ready_parts(int64 offset) const {
int64 res = 0;
while (get(offset + res)) {
res++;
}
return res;
};
std::vector<int32> Bitmask::as_vector() const {
std::vector<int32> res;
for (int32 i = 0; i < narrow_cast<int32>(data_.size() * 8); i++) {
if (get(i)) {
res.push_back(i);
}
}
return res;
}
void Bitmask::set(int64 offset) {
auto need_size = narrow_cast<size_t>(offset / 8 + 1);
if (need_size > data_.size()) {
data_.resize(need_size, 0);
}
data_[need_size - 1] |= (1 << (offset % 8));
}
int64 Bitmask::size() const {
return data_.size() * 8;
}
} // namespace td

View File

@ -0,0 +1,50 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/common.h"
#include "td/utils/Slice.h"
#include "td/utils/StringBuilder.h"
namespace td {
class Bitmask {
public:
struct ReadySize {
int64 offset{-1};
int64 ready_size{-1};
bool empty() const {
return offset == -1;
}
};
struct Decode {};
struct Ones {};
Bitmask() = default;
Bitmask(Decode, Slice data);
Bitmask(Ones, int64 count);
std::string encode() const;
ReadySize get_ready_size(int64 offset, int64 part_size) const;
int64 get_total_size(int64 part_size) const;
bool get(int64 offset) const;
int64 get_ready_parts(int64 offset) const;
std::vector<int32> as_vector() const;
void set(int64 offset);
int64 size() const;
private:
std::string data_;
};
inline StringBuilder &operator<<(StringBuilder &sb, const Bitmask &mask) {
std::string res;
for (int64 i = 0; i < mask.size(); i++) {
res += mask.get(i) ? "1" : "0";
}
return sb << res;
}
} // namespace td

View File

@ -32,7 +32,7 @@ namespace td {
FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const LocalFileLocation &local, int64 size,
string name, const FileEncryptionKey &encryption_key, bool is_small, bool search_file,
unique_ptr<Callback> callback)
int64 offset, unique_ptr<Callback> callback)
: remote_(remote)
, local_(local)
, size_(size)
@ -40,10 +40,14 @@ FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const Local
, encryption_key_(encryption_key)
, callback_(std::move(callback))
, is_small_(is_small)
, search_file_(search_file) {
, search_file_(search_file)
, offset_(offset) {
if (encryption_key.is_secret()) {
set_ordered_flag(true);
}
if (!encryption_key.empty()) {
CHECK(offset_ == 0);
}
}
Result<FileLoader::FileInfo> FileDownloader::init() {
@ -59,22 +63,22 @@ Result<FileLoader::FileInfo> FileDownloader::init() {
if (remote_.file_type_ == FileType::Secure) {
size_ = 0;
}
int ready_part_count = 0;
int32 part_size = 0;
Bitmask bitmask{Bitmask::Ones{}, 0};
if (local_.type() == LocalFileLocation::Type::Partial) {
const auto &partial = local_.partial();
path_ = partial.path_;
auto result_fd = FileFd::open(path_, FileFd::Write | FileFd::Read);
// TODO: check timestamps..
if (result_fd.is_ok()) {
bitmask = Bitmask(Bitmask::Decode{}, partial.ready_bitmask_);
if (encryption_key_.is_secret()) {
CHECK(partial.iv_.size() == 32) << partial.iv_.size();
encryption_key_.mutable_iv() = as<UInt256>(partial.iv_.data());
next_part_ = partial.ready_part_count_;
next_part_ = narrow_cast<int32>(bitmask.get_ready_parts(0));
}
fd_ = result_fd.move_as_ok();
part_size = partial.part_size_;
ready_part_count = partial.ready_part_count_;
}
}
if (search_file_ && fd_.empty() && size_ > 0 && size_ < 1000 * (1 << 20) && encryption_key_.empty() &&
@ -88,27 +92,23 @@ Result<FileLoader::FileInfo> FileDownloader::init() {
need_check_ = true;
only_check_ = true;
part_size = 32 * (1 << 10);
ready_part_count = narrow_cast<int>((size_ + part_size - 1) / part_size);
bitmask = Bitmask{Bitmask::Ones{}, (size_ + part_size - 1) / part_size};
return Status::OK();
}();
}
std::vector<int> parts(ready_part_count);
for (int i = 0; i < ready_part_count; i++) {
parts[i] = i;
}
FileInfo res;
res.size = size_;
res.is_size_final = true;
res.part_size = part_size;
res.ready_parts = std::move(parts);
res.ready_parts = bitmask.as_vector();
res.use_part_count_limit = false;
res.only_check = only_check_;
res.need_delay = !is_small_ && (remote_.file_type_ == FileType::VideoNote ||
remote_.file_type_ == FileType::VoiceNote || remote_.file_type_ == FileType::Audio ||
remote_.file_type_ == FileType::Video || remote_.file_type_ == FileType::Animation ||
(remote_.file_type_ == FileType::Encrypted && size_ > (1 << 20)));
res.offset = offset_;
return res;
}
Status FileDownloader::on_ok(int64 size) {
@ -345,8 +345,8 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) {
}
return written;
}
void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready,
int64 ready_size) {
void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, string ready_bitmask,
bool is_ready, int64 ready_size) {
if (is_ready) {
// do not send partial location. will lead to wrong local_size
return;
@ -355,7 +355,7 @@ void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_
return;
}
if (encryption_key_.empty() || encryption_key_.is_secure()) {
callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, path_, part_size, ready_part_count, ""},
callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, path_, part_size, "", ready_bitmask},
ready_size);
} else if (encryption_key_.is_secret()) {
UInt256 iv;
@ -365,8 +365,7 @@ void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_
LOG(FATAL) << tag("ready_part_count", ready_part_count) << tag("next_part", next_part_);
}
callback_->on_partial_download(
PartialLocalFileLocation{remote_.file_type_, path_, part_size, ready_part_count, as_slice(iv).str()},
ready_size);
PartialLocalFileLocation{remote_.file_type_, path_, part_size, as_slice(iv).str(), ready_bitmask}, ready_size);
} else {
UNREACHABLE();
}

View File

@ -33,7 +33,7 @@ class FileDownloader : public FileLoader {
};
FileDownloader(const FullRemoteFileLocation &remote, const LocalFileLocation &local, int64 size, string name,
const FileEncryptionKey &encryption_key, bool is_small, bool search_file,
const FileEncryptionKey &encryption_key, bool is_small, bool search_file, int64 offset,
unique_ptr<Callback> callback);
// Should just implement all parent pure virtual methods.
@ -56,6 +56,7 @@ class FileDownloader : public FileLoader {
bool next_part_stop_ = false;
bool is_small_;
bool search_file_{false};
int64 offset_;
bool use_cdn_ = false;
DcId cdn_dc_id_;
@ -84,7 +85,8 @@ class FileDownloader : public FileLoader {
Result<bool> should_restart_part(Part part, NetQueryPtr &net_query) override TD_WARN_UNUSED_RESULT;
Result<std::pair<NetQueryPtr, bool>> start_part(Part part, int32 part_count) override TD_WARN_UNUSED_RESULT;
Result<size_t> process_part(Part part, NetQueryPtr net_query) override TD_WARN_UNUSED_RESULT;
void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready, int64 ready_size) override;
void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, string ready_bitmask, bool is_ready,
int64 ready_size) override;
FileLoader::Callback *get_callback() override;
Status process_check_query(NetQueryPtr net_query) override;
Result<CheckInfo> check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) override;

View File

@ -83,10 +83,11 @@ class FileDownloadGenerateActor : public FileGenerateActor {
ActorId<FileDownloadGenerateActor> parent_;
};
send_closure(G()->file_manager(), &FileManager::download, file_id_, std::make_shared<Callback>(actor_id(this)), 1);
send_closure(G()->file_manager(), &FileManager::download, file_id_, std::make_shared<Callback>(actor_id(this)), 1,
-1);
}
void hangup() override {
send_closure(G()->file_manager(), &FileManager::download, file_id_, nullptr, 0);
send_closure(G()->file_manager(), &FileManager::download, file_id_, nullptr, 0, 0);
stop();
}
@ -302,8 +303,9 @@ class FileExternalGenerateActor : public FileGenerateActor {
if (local_prefix_size < 0) {
return Status::Error(1, "Invalid local prefix size");
}
callback_->on_partial_generate(
PartialLocalFileLocation{generate_location_.file_type_, path_, 1, local_prefix_size, ""}, expected_size);
callback_->on_partial_generate(PartialLocalFileLocation{generate_location_.file_type_, path_, local_prefix_size,
Bitmask(Bitmask::Ones{}, 1).encode(), ""},
expected_size);
return Status::OK();
}

View File

@ -39,7 +39,7 @@ ActorOwn<ResourceManager> &FileLoadManager::get_download_resource_manager(bool i
void FileLoadManager::download(QueryId id, const FullRemoteFileLocation &remote_location,
const LocalFileLocation &local, int64 size, string name,
const FileEncryptionKey &encryption_key, bool search_file, int8 priority) {
const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int8 priority) {
if (stop_flag_) {
return;
}
@ -51,7 +51,7 @@ void FileLoadManager::download(QueryId id, const FullRemoteFileLocation &remote_
auto callback = make_unique<FileDownloaderCallback>(actor_shared(this, node_id));
bool is_small = size < 20 * 1024;
node->loader_ = create_actor<FileDownloader>("Downloader", remote_location, local, size, std::move(name),
encryption_key, is_small, search_file, std::move(callback));
encryption_key, is_small, search_file, offset, std::move(callback));
DcId dc_id = remote_location.is_web() ? G()->get_webfile_dc_id() : remote_location.get_dc_id();
auto &resource_manager = get_download_resource_manager(is_small, dc_id);
send_closure(resource_manager, &ResourceManager::register_worker,
@ -156,6 +156,20 @@ void FileLoadManager::update_local_file_location(QueryId id, const LocalFileLoca
}
send_closure(node->loader_, &FileLoaderActor::update_local_file_location, local);
}
void FileLoadManager::update_download_offset(QueryId id, int64 offset) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_download_offset, offset);
}
void FileLoadManager::hangup() {
nodes_container_.for_each([](auto id, auto &node) { node.loader_.reset(); });
stop_flag_ = true;

View File

@ -46,7 +46,7 @@ class FileLoadManager final : public Actor {
explicit FileLoadManager(ActorShared<Callback> callback, ActorShared<> parent);
void download(QueryId id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local, int64 size,
string name, const FileEncryptionKey &encryption_key, bool search_file, int8 priority);
string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int8 priority);
void upload(QueryId id, const LocalFileLocation &local_location, const RemoteFileLocation &remote_location,
int64 size, const FileEncryptionKey &encryption_key, int8 priority, vector<int> bad_parts);
void upload_by_hash(QueryId id, const FullLocalFileLocation &local_location, int64 size, int8 priority);
@ -54,6 +54,7 @@ class FileLoadManager final : public Actor {
void from_bytes(QueryId id, FileType type, BufferSlice bytes, string name);
void cancel(QueryId id);
void update_local_file_location(QueryId id, const LocalFileLocation &local);
void update_download_offset(QueryId id, int64 offset);
void get_content(const FullLocalFileLocation &local_location, Promise<BufferSlice> promise);
private:

View File

@ -63,6 +63,15 @@ void FileLoader::update_local_file_location(const LocalFileLocation &local) {
loop();
}
void FileLoader::update_download_offset(int64 offset) {
parts_manager_.set_streaming_offset(offset);
//TODO: cancel only some queries
for (auto &it : part_map_) {
it.second.second.reset(); // cancel_query(it.second.second);
}
loop();
}
void FileLoader::start_up() {
auto r_file_info = init();
if (r_file_info.is_error()) {
@ -78,14 +87,15 @@ void FileLoader::start_up() {
auto &ready_parts = file_info.ready_parts;
auto use_part_count_limit = file_info.use_part_count_limit;
auto status = parts_manager_.init(size, expected_size, is_size_final, part_size, ready_parts, use_part_count_limit);
if (file_info.only_check) {
parts_manager_.set_checked_prefix_size(0);
}
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
return;
}
if (file_info.only_check) {
parts_manager_.set_checked_prefix_size(0);
}
parts_manager_.set_streaming_offset(file_info.offset);
if (ordered_flag_) {
ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count());
}
@ -289,6 +299,7 @@ Status FileLoader::try_on_part_query(Part part, NetQueryPtr query) {
void FileLoader::on_progress_impl(size_t size) {
on_progress(parts_manager_.get_part_count(), static_cast<int32>(parts_manager_.get_part_size()),
parts_manager_.get_ready_prefix_count(), parts_manager_.ready(), parts_manager_.get_ready_size());
parts_manager_.get_ready_prefix_count(), parts_manager_.get_bitmask(), parts_manager_.ready(),
parts_manager_.get_ready_size());
}
} // namespace td

View File

@ -38,6 +38,7 @@ class FileLoader : public FileLoaderActor {
void update_resources(const ResourceState &other) override;
void update_local_file_location(const LocalFileLocation &local) override;
void update_download_offset(int64 offset) override;
protected:
void set_ordered_flag(bool flag);
@ -49,13 +50,14 @@ class FileLoader : public FileLoaderActor {
};
struct FileInfo {
int64 size;
int64 expected_size = 0;
int64 expected_size{0};
bool is_size_final;
int32 part_size;
std::vector<int> ready_parts;
bool use_part_count_limit = true;
bool only_check = false;
bool need_delay = false;
int64 offset{0};
};
virtual Result<FileInfo> init() TD_WARN_UNUSED_RESULT = 0;
virtual Status on_ok(int64 size) TD_WARN_UNUSED_RESULT = 0;
@ -67,8 +69,8 @@ class FileLoader : public FileLoaderActor {
virtual void after_start_parts() {
}
virtual Result<size_t> process_part(Part part, NetQueryPtr net_query) TD_WARN_UNUSED_RESULT = 0;
virtual void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready,
int64 ready_size) = 0;
virtual void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, std::string ready_bitmask,
bool is_ready, int64 ready_size) = 0;
virtual Callback *get_callback() = 0;
virtual Result<PrefixInfo> on_update_local_location(const LocalFileLocation &location) TD_WARN_UNUSED_RESULT {
return Status::Error("unsupported");

View File

@ -22,9 +22,11 @@ class FileLoaderActor : public NetQueryCallback {
virtual void update_priority(int8 priority) = 0;
virtual void update_resources(const ResourceState &other) = 0;
// TODO: existence of this function is a dirty hack. Refactoring is highly appreciated
// TODO: existence of this two functions is a dirty hack. Refactoring is highly appreciated
virtual void update_local_file_location(const LocalFileLocation &local) {
}
virtual void update_download_offset(int64 offset) {
}
};
} // namespace td

View File

@ -6,6 +6,7 @@
//
#pragma once
#include "td/telegram/files/FileBitmask.h"
#include "td/telegram/td_api.h"
#include "td/telegram/telegram_api.h"
@ -909,8 +910,8 @@ struct PartialLocalFileLocation {
FileType file_type_;
string path_;
int32 part_size_;
int32 ready_part_count_;
string iv_;
string ready_bitmask_;
template <class StorerT>
void store(StorerT &storer) const {
@ -918,8 +919,10 @@ struct PartialLocalFileLocation {
store(file_type_, storer);
store(path_, storer);
store(part_size_, storer);
store(ready_part_count_, storer);
int32 deprecated_ready_part_count = -1;
store(deprecated_ready_part_count, storer);
store(iv_, storer);
store(ready_bitmask_, storer);
}
template <class ParserT>
void parse(ParserT &parser) {
@ -930,14 +933,20 @@ struct PartialLocalFileLocation {
}
parse(path_, parser);
parse(part_size_, parser);
parse(ready_part_count_, parser);
int32 deprecated_ready_part_count;
parse(deprecated_ready_part_count, parser);
parse(iv_, parser);
if (deprecated_ready_part_count == -1) {
parse(ready_bitmask_, parser);
} else {
ready_bitmask_ = Bitmask(Bitmask::Ones{}, deprecated_ready_part_count).encode();
}
}
};
inline bool operator==(const PartialLocalFileLocation &lhs, const PartialLocalFileLocation &rhs) {
return lhs.file_type_ == rhs.file_type_ && lhs.path_ == rhs.path_ && lhs.part_size_ == rhs.part_size_ &&
lhs.ready_part_count_ == rhs.ready_part_count_ && lhs.iv_ == rhs.iv_;
lhs.iv_ == rhs.iv_ && lhs.ready_bitmask_ == rhs.ready_bitmask_;
}
inline bool operator!=(const PartialLocalFileLocation &lhs, const PartialLocalFileLocation &rhs) {

View File

@ -33,6 +33,9 @@
#include <utility>
namespace td {
namespace {
constexpr int64 MAX_FILE_SIZE = 1500 * (1 << 20) /* 1500MB */;
}
int VERBOSITY_NAME(update_file) = VERBOSITY_NAME(DEBUG);
@ -63,7 +66,48 @@ FileNodePtr::operator bool() const {
return file_manager_ != nullptr && get_unsafe() != nullptr;
}
void FileNode::set_local_location(const LocalFileLocation &local, int64 ready_size) {
void FileNode::recalc_ready_prefix_size(Bitmask::ReadySize ready_prefix_size) {
if (local_.type() != LocalFileLocation::Type::Partial) {
return;
}
int64 new_local_ready_prefix_size;
if (download_offset_ == ready_prefix_size.offset) {
new_local_ready_prefix_size = ready_prefix_size.ready_size;
} else {
new_local_ready_prefix_size = Bitmask(Bitmask::Decode{}, local_.partial().ready_bitmask_)
.get_ready_size(download_offset_, local_.partial().part_size_)
.ready_size;
}
if (new_local_ready_prefix_size != local_ready_prefix_size_) {
local_ready_prefix_size_ = new_local_ready_prefix_size;
on_info_changed();
}
}
void FileNode::init_ready_size() {
if (local_.type() != LocalFileLocation::Type::Partial) {
return;
}
auto bitmask = Bitmask(Bitmask::Decode{}, local_.partial().ready_bitmask_);
local_ready_prefix_size_ = bitmask.get_ready_size(0, local_.partial().part_size_).ready_size;
local_ready_size_ = bitmask.get_total_size(local_.partial().part_size_);
}
void FileNode::set_download_offset(int64 download_offset) {
if (download_offset < 0 || download_offset > MAX_FILE_SIZE) {
return;
}
if (download_offset == download_offset_) {
return;
}
download_offset_ = download_offset;
is_download_offset_dirty_ = true;
recalc_ready_prefix_size({});
on_info_changed();
}
void FileNode::set_local_location(const LocalFileLocation &local, int64 ready_size,
Bitmask::ReadySize ready_prefix_size) {
if (local_ready_size_ != ready_size) {
local_ready_size_ = ready_size;
VLOG(update_file) << "File " << main_file_id_ << " has changed local ready size";
@ -72,6 +116,9 @@ void FileNode::set_local_location(const LocalFileLocation &local, int64 ready_si
if (local_ != local) {
VLOG(update_file) << "File " << main_file_id_ << " has changed local location";
local_ = local;
recalc_ready_prefix_size(ready_prefix_size);
on_changed();
}
}
@ -297,6 +344,28 @@ bool FileView::is_downloading() const {
return node_->download_priority_ != 0 || node_->generate_download_priority_ != 0;
}
int64 FileView::download_offset() const {
return node_->download_offset_;
}
int64 FileView::downloaded_prefix(int64 offset) const {
switch (node_->local_.type()) {
case LocalFileLocation::Type::Empty:
return 0;
case LocalFileLocation::Type::Full:
if (offset < node_->size_) {
return node_->size_ - offset;
}
return 0;
case LocalFileLocation::Type::Partial:
return Bitmask(Bitmask::Decode{}, node_->local_.partial().ready_bitmask_)
.get_ready_size(offset, node_->local_.partial().part_size_)
.ready_size;
default:
UNREACHABLE();
return 0;
}
}
int64 FileView::local_size() const {
switch (node_->local_.type()) {
case LocalFileLocation::Type::Full:
@ -306,7 +375,7 @@ int64 FileView::local_size() const {
// File is not decrypted yet
return 0;
}
return node_->local_.partial().part_size_ * node_->local_.partial().ready_part_count_;
return node_->local_ready_prefix_size_;
}
default:
return 0;
@ -319,8 +388,7 @@ int64 FileView::local_total_size() const {
case LocalFileLocation::Type::Full:
return node_->size_;
case LocalFileLocation::Type::Partial:
return max(static_cast<int64>(node_->local_.partial().part_size_) * node_->local_.partial().ready_part_count_,
node_->local_ready_size_);
return max(node_->local_ready_prefix_size_, node_->local_ready_size_);
default:
UNREACHABLE();
return 0;
@ -528,7 +596,6 @@ string FileManager::get_file_name(FileType file_type, Slice path) {
Status FileManager::check_local_location(FullLocalFileLocation &location, int64 &size) {
constexpr int64 MAX_THUMBNAIL_SIZE = 200 * (1 << 10) /* 200KB */;
constexpr int64 MAX_FILE_SIZE = 1500 * (1 << 20) /* 1500MB */;
if (location.path_.empty()) {
return Status::Error("File must have non-empty path");
@ -1051,9 +1118,11 @@ Result<FileId> FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy
node->download_id_ = other_node->download_id_;
node->is_download_started_ |= other_node->is_download_started_;
node->set_download_priority(other_node->download_priority_);
node->set_download_offset(other_node->download_offset_);
other_node->download_id_ = 0;
other_node->is_download_started_ = false;
other_node->download_priority_ = 0;
other_node->download_offset_ = 0;
//cancel_generate(node);
//node->set_generate_location(std::move(other_node->generate_));
@ -1449,7 +1518,8 @@ void FileManager::delete_file(FileId file_id, Promise<Unit> promise, const char
promise.set_value(Unit());
}
void FileManager::download(FileId file_id, std::shared_ptr<DownloadCallback> callback, int32 new_priority) {
void FileManager::download(FileId file_id, std::shared_ptr<DownloadCallback> callback, int32 new_priority,
int64 offset) {
LOG(INFO) << "Download file " << file_id << " with priority " << new_priority;
auto node = get_sync_file_node(file_id);
if (!node) {
@ -1496,7 +1566,7 @@ void FileManager::download(FileId file_id, std::shared_ptr<DownloadCallback> cal
}
LOG(INFO) << "Change download priority of file " << file_id << " to " << new_priority;
node->set_download_offset(offset);
auto *file_info = get_file_id_info(file_id);
CHECK(new_priority == 0 || callback);
file_info->download_priority_ = narrow_cast<int8>(new_priority);
@ -1509,6 +1579,21 @@ void FileManager::download(FileId file_id, std::shared_ptr<DownloadCallback> cal
try_flush_node(node);
}
void FileManager::download_set_offset(FileId file_id, int64 offset) {
auto file_node = get_sync_file_node(file_id);
if (!file_node) {
LOG(INFO) << "File " << file_id << " not found";
return;
}
if (FileView(file_node).is_encrypted()) {
offset = 0;
}
file_node->set_download_offset(offset);
run_generate(file_node);
run_download(file_node);
try_flush_node(file_node);
}
void FileManager::run_download(FileNodePtr node) {
if (node->need_load_from_pmc_) {
return;
@ -1537,10 +1622,19 @@ void FileManager::run_download(FileNodePtr node) {
}
return;
}
if (file_view.is_encrypted()) {
node->set_download_offset(0);
}
auto offset = node->download_offset_;
bool need_update_offset = node->is_download_offset_dirty_;
node->is_download_offset_dirty_ = false;
if (old_priority != 0) {
CHECK(node->download_id_ != 0);
send_closure(file_load_manager_, &FileLoadManager::update_priority, node->download_id_, priority);
if (need_update_offset) {
send_closure(file_load_manager_, &FileLoadManager::update_download_offset, node->download_id_, offset);
}
return;
}
@ -1553,7 +1647,8 @@ void FileManager::run_download(FileNodePtr node) {
LOG(DEBUG) << "Run download of file " << file_id << " of size " << node->size_ << " from " << node->remote_.full()
<< " with suggested name " << node->suggested_name() << " and encyption key " << node->encryption_key_;
send_closure(file_load_manager_, &FileLoadManager::download, id, node->remote_.full(), node->local_, node->size_,
node->suggested_name(), node->encryption_key_, node->can_search_locally_, priority);
node->suggested_name(), node->encryption_key_, node->can_search_locally_, node->download_offset_,
priority);
}
void FileManager::resume_upload(FileId file_id, std::vector<int> bad_parts, std::shared_ptr<UploadCallback> callback,
@ -1811,37 +1906,6 @@ void FileManager::upload(FileId file_id, std::shared_ptr<UploadCallback> callbac
return resume_upload(file_id, std::vector<int>(), std::move(callback), new_priority, upload_order);
}
// is't quite stupid, yep
// 0x00 <count of zeroes>
static string zero_decode(Slice s) {
string res;
for (size_t n = s.size(), i = 0; i < n; i++) {
if (i + 1 < n && s[i] == 0) {
res.append(static_cast<unsigned char>(s[i + 1]), 0);
i++;
continue;
}
res.push_back(s[i]);
}
return res;
}
static string zero_encode(Slice s) {
string res;
for (size_t n = s.size(), i = 0; i < n; i++) {
res.push_back(s[i]);
if (s[i] == 0) {
unsigned char cnt = 1;
while (cnt < 250 && i + cnt < n && s[i + cnt] == 0) {
cnt++;
}
res.push_back(cnt);
i += cnt - 1;
}
}
return res;
}
static bool is_document_type(FileType type) {
return type == FileType::Document || type == FileType::Sticker || type == FileType::Audio ||
type == FileType::Animation;
@ -1965,6 +2029,7 @@ tl_object_ptr<td_api::file> FileManager::get_file_object(FileId file_id, bool wi
int32 size = narrow_cast<int32>(file_view.size());
int32 expected_size = narrow_cast<int32>(file_view.expected_size());
int32 download_offset = narrow_cast<int32>(file_view.download_offset());
int32 local_size = narrow_cast<int32>(file_view.local_size());
int32 local_total_size = narrow_cast<int32>(file_view.local_total_size());
int32 remote_size = narrow_cast<int32>(file_view.remote_size());
@ -1987,8 +2052,8 @@ tl_object_ptr<td_api::file> FileManager::get_file_object(FileId file_id, bool wi
return td_api::make_object<td_api::file>(
result_file_id.get(), size, expected_size,
td_api::make_object<td_api::localFile>(std::move(path), can_be_downloaded, can_be_deleted,
file_view.is_downloading(), file_view.has_local_location(), local_size,
local_total_size),
file_view.is_downloading(), file_view.has_local_location(),
download_offset, local_size, local_total_size),
td_api::make_object<td_api::remoteFile>(std::move(persistent_file_id), file_view.is_uploading(),
is_uploading_completed, remote_size));
}
@ -2208,6 +2273,8 @@ void FileManager::on_partial_download(QueryId query_id, const PartialLocalFileLo
auto file_id = query->file_id_;
auto file_node = get_file_node(file_id);
LOG(DEBUG) << "Receive on_parial_download for file " << file_id;
LOG(ERROR) << "Receive on_parital_generate for file " << file_id << ": " << partial_local.path_ << " "
<< Bitmask(Bitmask::Decode{}, partial_local.ready_bitmask_);
if (!file_node) {
return;
}
@ -2379,7 +2446,7 @@ void FileManager::on_partial_generate(QueryId query_id, const PartialLocalFileLo
auto file_id = query->file_id_;
auto file_node = get_file_node(file_id);
LOG(DEBUG) << "Receive on_parital_generate for file " << file_id << ": " << partial_local.path_ << " "
<< partial_local.ready_part_count_;
<< Bitmask(Bitmask::Decode{}, partial_local.ready_bitmask_);
if (!file_node) {
return;
}

View File

@ -56,8 +56,9 @@ class FileNode {
, encryption_key_(std::move(key))
, main_file_id_(main_file_id)
, main_file_id_priority_(main_file_id_priority) {
init_ready_size();
}
void set_local_location(const LocalFileLocation &local, int64 ready_size);
void set_local_location(const LocalFileLocation &local, int64 ready_size, Bitmask::ReadySize ready_prefix_size = {});
void set_remote_location(const RemoteFileLocation &remote, FileLocationSource source, int64 ready_size);
void set_generate_location(unique_ptr<FullGenerateFileLocation> &&generate);
void set_size(int64 size);
@ -71,6 +72,9 @@ class FileNode {
void set_upload_priority(int8 priority);
void set_generate_priority(int8 download_priority, int8 upload_priority);
void set_download_offset(int64 download_offset);
void recalc_ready_prefix_size(Bitmask::ReadySize ready_prefix_size);
void on_changed();
void on_info_changed();
void on_pmc_changed();
@ -90,6 +94,8 @@ class FileNode {
LocalFileLocation local_;
FileLoadManager::QueryId upload_id_ = 0;
int64 local_ready_size_ = 0;
int64 download_offset_ = 0;
int64 local_ready_prefix_size_ = 0;
RemoteFileLocation remote_;
FileLoadManager::QueryId download_id_ = 0;
@ -118,6 +124,7 @@ class FileNode {
int8 generate_upload_priority_ = 0;
int8 main_file_id_priority_ = 0;
bool is_download_offset_dirty_ = false;
FileLocationSource remote_source_ = FileLocationSource::FromUser;
@ -131,6 +138,9 @@ class FileNode {
bool pmc_changed_flag_{false};
bool info_changed_flag_{false};
bool offset_changed_flags_{false};
void init_ready_size();
};
class FileManager;
@ -209,6 +219,8 @@ class FileView {
int64 size() const;
int64 expected_size() const;
bool is_downloading() const;
int64 download_offset() const;
int64 downloaded_prefix(int64 offset) const;
int64 local_size() const;
int64 local_total_size() const;
bool is_uploading() const;
@ -327,7 +339,8 @@ class FileManager : public FileLoadManager::Callback {
bool set_encryption_key(FileId file_id, FileEncryptionKey key);
bool set_content(FileId file_id, BufferSlice bytes);
void download(FileId file_id, std::shared_ptr<DownloadCallback> callback, int32 new_priority);
void download(FileId file_id, std::shared_ptr<DownloadCallback> callback, int32 new_priority, int64 offset);
void download_set_offset(FileId file_id, int64 offset);
void upload(FileId file_id, std::shared_ptr<UploadCallback> callback, int32 new_priority, uint64 upload_order);
void resume_upload(FileId file_id, std::vector<int> bad_parts, std::shared_ptr<UploadCallback> callback,
int32 new_priority, uint64 upload_order);

View File

@ -107,7 +107,9 @@ Result<FileLoader::PrefixInfo> FileUploader::on_update_local_location(const Loca
file_type = FileType::Temp;
} else if (location.type() == LocalFileLocation::Type::Partial) {
path = location.partial().path_;
local_size = static_cast<int64>(location.partial().part_size_) * location.partial().ready_part_count_;
local_size = Bitmask(Bitmask::Decode{}, location.partial().ready_bitmask_)
.get_ready_size(0, location.partial().part_size_)
.ready_size;
local_is_ready = false;
file_type = location.partial().file_type_;
} else {
@ -292,8 +294,8 @@ Result<size_t> FileUploader::process_part(Part part, NetQueryPtr net_query) {
return part.size;
}
void FileUploader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready,
int64 ready_size) {
void FileUploader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, std::string bitmask,
bool is_ready, int64 ready_size) {
callback_->on_partial_upload(PartialRemoteFileLocation{file_id_, part_count, part_size, ready_part_count, big_flag_},
ready_size);
if (is_ready) {

View File

@ -62,7 +62,8 @@ class FileUploader : public FileLoader {
void after_start_parts() override;
Result<std::pair<NetQueryPtr, bool>> start_part(Part part, int32 part_count) override TD_WARN_UNUSED_RESULT;
Result<size_t> process_part(Part part, NetQueryPtr net_query) override TD_WARN_UNUSED_RESULT;
void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, bool is_ready, int64 ready_size) override;
void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, string ready_bitmask, bool is_ready,
int64 ready_size) override;
FileLoader::Callback *get_callback() override;
Result<PrefixInfo> on_update_local_location(const LocalFileLocation &location) override TD_WARN_UNUSED_RESULT;

View File

@ -29,6 +29,27 @@ Status PartsManager::init_known_prefix(int64 known_prefix, size_t part_size, con
return init_no_size(part_size, ready_parts);
}
void PartsManager::set_streaming_offset(int64 offset) {
if (need_check_ || (!unknown_size_flag_ && get_size() < offset)) {
streaming_offset_ = 0;
return;
}
auto part_i = offset / part_size_;
if (part_i > MAX_PART_COUNT || part_i < 0) {
streaming_offset_ = 0;
// error?
return;
}
streaming_offset_ = offset;
first_streaming_empty_part_ = narrow_cast<int>(part_i);
if (part_count_ <= first_streaming_empty_part_) {
part_count_ = first_streaming_empty_part_ + 1;
part_status_.resize(part_count_, PartStatus::Empty);
}
}
Status PartsManager::init_no_size(size_t part_size, const std::vector<int> &ready_parts) {
unknown_size_flag_ = true;
size_ = 0;
@ -121,6 +142,14 @@ void PartsManager::update_first_empty_part() {
while (first_empty_part_ < part_count_ && part_status_[first_empty_part_] != PartStatus::Empty) {
first_empty_part_++;
}
if (streaming_offset_ == 0) {
first_streaming_empty_part_ = first_empty_part_;
return;
}
while (first_streaming_empty_part_ < part_count_ && part_status_[first_streaming_empty_part_] != PartStatus::Empty) {
first_streaming_empty_part_++;
}
}
void PartsManager::update_first_not_ready_part() {
@ -144,10 +173,14 @@ int32 PartsManager::get_ready_prefix_count() {
}
return res;
}
string PartsManager::get_bitmask() const {
return bitmask_.encode();
}
Result<Part> PartsManager::start_part() {
update_first_empty_part();
if (first_empty_part_ == part_count_) {
auto part_i = first_streaming_empty_part_;
if (part_i == part_count_) {
if (unknown_size_flag_) {
if (known_prefix_flag_ == false) {
part_count_++;
@ -159,13 +192,16 @@ Result<Part> PartsManager::start_part() {
return Status::Error(1, "Wait for prefix to be known");
}
} else {
return get_empty_part();
if (first_empty_part_ < part_count_) {
part_i = first_empty_part_;
} else {
return get_empty_part();
}
}
}
CHECK(part_status_[first_empty_part_] == PartStatus::Empty);
int id = first_empty_part_;
on_part_start(id);
return get_part(id);
CHECK(part_status_[part_i] == PartStatus::Empty);
on_part_start(part_i);
return get_part(part_i);
}
Status PartsManager::set_known_prefix(size_t size, bool is_ready) {
@ -200,6 +236,7 @@ Status PartsManager::on_part_ok(int32 id, size_t part_size, size_t actual_size)
pending_count_--;
part_status_[id] = PartStatus::Ready;
bitmask_.set(id);
ready_size_ += narrow_cast<int64>(actual_size);
VLOG(files) << "Transferred part " << id << " of size " << part_size << ", total ready size = " << ready_size_;
@ -241,6 +278,14 @@ void PartsManager::on_part_failed(int32 id) {
if (id < first_empty_part_) {
first_empty_part_ = id;
}
if (streaming_offset_ == 0) {
first_streaming_empty_part_ = id;
return;
}
auto part_i = narrow_cast<int>(streaming_offset_ / part_size_);
if (id >= part_i && id < first_streaming_empty_part_) {
first_streaming_empty_part_ = id;
}
}
int64 PartsManager::get_size() const {
@ -280,6 +325,7 @@ void PartsManager::init_common(const std::vector<int> &ready_parts) {
for (auto i : ready_parts) {
CHECK(0 <= i && i < part_count_) << tag("i", i) << tag("part_count", part_count_);
part_status_[i] = PartStatus::Ready;
bitmask_.set(i);
auto part = get_part(i);
ready_size_ += narrow_cast<int64>(part.size);
}
@ -289,6 +335,7 @@ void PartsManager::init_common(const std::vector<int> &ready_parts) {
void PartsManager::set_need_check() {
need_check_ = true;
set_streaming_offset(0);
}
void PartsManager::set_checked_prefix_size(int64 size) {

View File

@ -9,6 +9,8 @@
#include "td/utils/common.h"
#include "td/utils/Status.h"
#include "td/telegram/files/FileBitmask.h"
namespace td {
/*** PartsManager***/
@ -33,6 +35,7 @@ class PartsManager {
Status set_known_prefix(size_t size, bool is_ready);
void set_need_check();
void set_checked_prefix_size(int64 size);
void set_streaming_offset(int64 offset);
int64 get_checked_prefix_size() const;
int64 get_unchecked_ready_prefix_size();
@ -44,6 +47,7 @@ class PartsManager {
int32 get_part_count() const;
int32 get_unchecked_ready_prefix_count();
int32 get_ready_prefix_count();
string get_bitmask() const;
private:
static constexpr int MAX_PART_COUNT = 3000;
@ -70,7 +74,10 @@ class PartsManager {
int pending_count_;
int first_empty_part_;
int first_not_ready_part_;
int64 streaming_offset_{0};
int first_streaming_empty_part_;
vector<PartStatus> part_status_;
Bitmask bitmask_;
bool use_part_count_limit_;
void init_common(const vector<int> &ready_parts);

View File

@ -125,5 +125,55 @@ string url_encode(Slice str) {
CHECK(result.size() == length);
return result;
}
namespace detail {
template <class F>
string x_decode(Slice s, F &&f) {
string res;
for (size_t n = s.size(), i = 0; i < n; i++) {
if (i + 1 < n && f(s[i])) {
res.append(static_cast<unsigned char>(s[i + 1]), s[i]);
i++;
continue;
}
res.push_back(s[i]);
}
return res;
}
template <class F>
string x_encode(Slice s, F &&f) {
string res;
for (size_t n = s.size(), i = 0; i < n; i++) {
res.push_back(s[i]);
if (f(s[i])) {
unsigned char cnt = 1;
while (cnt < 250 && i + cnt < n && s[i + cnt] == s[i]) {
cnt++;
}
res.push_back(cnt);
i += cnt - 1;
}
}
return res;
}
bool is_zero(uint8 c) {
return c == 0;
}
bool is_zero_or_one(uint8 c) {
return c == 0 || c == 0xff;
}
} // namespace detail
std::string zero_encode(Slice data) {
return detail::x_encode(data, detail::is_zero);
}
std::string zero_decode(Slice data) {
return detail::x_decode(data, detail::is_zero);
}
std::string zero_one_encode(Slice data) {
return detail::x_encode(data, detail::is_zero_or_one);
}
std::string zero_one_decode(Slice data) {
return detail::x_decode(data, detail::is_zero_or_one);
}
} // namespace td

View File

@ -380,4 +380,8 @@ detail::reversion_wrapper<T> reversed(T &iterable) {
return {iterable};
}
std::string zero_encode(Slice data);
std::string zero_decode(Slice data);
std::string zero_one_encode(Slice data);
std::string zero_one_decode(Slice data);
} // namespace td

View File

@ -612,7 +612,7 @@ class CheckTestC : public Task {
if (text.substr(0, tag_.size()) == tag_) {
file_id_to_check_ = messageDocument->document_->document_->id_;
LOG(ERROR) << "GOT FILE " << to_string(messageDocument->document_->document_);
this->send_query(make_tl_object<td_api::downloadFile>(file_id_to_check_, 1),
this->send_query(make_tl_object<td_api::downloadFile>(file_id_to_check_, 1, 0),
[](auto res) { check_td_error(res); });
}
}