Files: pass file size from FileDownloader to FileManager

GitOrigin-RevId: eb02bc323973b26b0306a84c5efadbb581bbef82
This commit is contained in:
Arseny Smirnov 2018-12-27 00:42:26 +03:00
parent dddb598b58
commit da6a7ec512
10 changed files with 59 additions and 41 deletions

View File

@ -346,27 +346,28 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) {
} }
return written; return written;
} }
void FileDownloader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, const string &ready_bitmask, void FileDownloader::on_progress(Progress progress) {
bool is_ready, int64 ready_size) { if (progress.is_ready) {
if (is_ready) {
// do not send partial location. will lead to wrong local_size // do not send partial location. will lead to wrong local_size
return; return;
} }
if (ready_size == 0 || path_.empty()) { if (progress.ready_size == 0 || path_.empty()) {
return; return;
} }
if (encryption_key_.empty() || encryption_key_.is_secure()) { if (encryption_key_.empty() || encryption_key_.is_secure()) {
callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, part_size, path_, "", ready_bitmask}, callback_->on_partial_download(
ready_size); PartialLocalFileLocation{remote_.file_type_, progress.part_size, path_, "", std::move(progress.ready_bitmask)},
progress.ready_size, progress.size);
} else if (encryption_key_.is_secret()) { } else if (encryption_key_.is_secret()) {
UInt256 iv; UInt256 iv;
if (ready_part_count == next_part_) { if (progress.ready_part_count == next_part_) {
iv = encryption_key_.mutable_iv(); iv = encryption_key_.mutable_iv();
} else { } else {
LOG(FATAL) << tag("ready_part_count", ready_part_count) << tag("next_part", next_part_); LOG(FATAL) << tag("ready_part_count", progress.ready_part_count) << tag("next_part", next_part_);
} }
callback_->on_partial_download( callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, progress.part_size, path_,
PartialLocalFileLocation{remote_.file_type_, part_size, path_, as_slice(iv).str(), ready_bitmask}, ready_size); as_slice(iv).str(), std::move(progress.ready_bitmask)},
progress.ready_size, progress.size);
} else { } else {
UNREACHABLE(); UNREACHABLE();
} }

View File

@ -27,7 +27,7 @@ class FileDownloader : public FileLoader {
class Callback : public FileLoader::Callback { class Callback : public FileLoader::Callback {
public: public:
virtual void on_start_download() = 0; virtual void on_start_download() = 0;
virtual void on_partial_download(const PartialLocalFileLocation &partial_local, int64 ready_size) = 0; virtual void on_partial_download(const PartialLocalFileLocation &partial_local, int64 ready_size, int64 size) = 0;
virtual void on_ok(const FullLocalFileLocation &full_local, int64 size) = 0; virtual void on_ok(const FullLocalFileLocation &full_local, int64 size) = 0;
virtual void on_error(Status status) = 0; virtual void on_error(Status status) = 0;
}; };
@ -85,8 +85,7 @@ class FileDownloader : public FileLoader {
Result<bool> should_restart_part(Part part, NetQueryPtr &net_query) override TD_WARN_UNUSED_RESULT; Result<bool> should_restart_part(Part part, NetQueryPtr &net_query) override TD_WARN_UNUSED_RESULT;
Result<std::pair<NetQueryPtr, bool>> start_part(Part part, int32 part_count) override TD_WARN_UNUSED_RESULT; Result<std::pair<NetQueryPtr, bool>> start_part(Part part, int32 part_count) override TD_WARN_UNUSED_RESULT;
Result<size_t> process_part(Part part, NetQueryPtr net_query) override TD_WARN_UNUSED_RESULT; Result<size_t> process_part(Part part, NetQueryPtr net_query) override TD_WARN_UNUSED_RESULT;
void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, const string &ready_bitmask, void on_progress(Progress progress) override;
bool is_ready, int64 ready_size) override;
FileLoader::Callback *get_callback() override; FileLoader::Callback *get_callback() override;
Status process_check_query(NetQueryPtr net_query) override; Status process_check_query(NetQueryPtr net_query) override;
Result<CheckInfo> check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) override; Result<CheckInfo> check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) override;

View File

@ -187,14 +187,14 @@ void FileLoadManager::on_start_download() {
} }
} }
void FileLoadManager::on_partial_download(const PartialLocalFileLocation &partial_local, int64 ready_size) { void FileLoadManager::on_partial_download(const PartialLocalFileLocation &partial_local, int64 ready_size, int64 size) {
auto node_id = get_link_token(); auto node_id = get_link_token();
auto node = nodes_container_.get(node_id); auto node = nodes_container_.get(node_id);
if (node == nullptr) { if (node == nullptr) {
return; return;
} }
if (!stop_flag_) { if (!stop_flag_) {
send_closure(callback_, &Callback::on_partial_download, node->query_id_, partial_local, ready_size); send_closure(callback_, &Callback::on_partial_download, node->query_id_, partial_local, ready_size, size);
} }
} }

View File

@ -35,7 +35,8 @@ class FileLoadManager final : public Actor {
Callback &operator=(const Callback &) = delete; Callback &operator=(const Callback &) = delete;
~Callback() override = default; ~Callback() override = default;
virtual void on_start_download(QueryId id) = 0; virtual void on_start_download(QueryId id) = 0;
virtual void on_partial_download(QueryId id, const PartialLocalFileLocation &partial_local, int64 ready_size) = 0; virtual void on_partial_download(QueryId id, const PartialLocalFileLocation &partial_local, int64 ready_size,
int64 size) = 0;
virtual void on_partial_upload(QueryId id, const PartialRemoteFileLocation &partial_remote, int64 ready_size) = 0; virtual void on_partial_upload(QueryId id, const PartialRemoteFileLocation &partial_remote, int64 ready_size) = 0;
virtual void on_hash(QueryId id, string hash) = 0; virtual void on_hash(QueryId id, string hash) = 0;
virtual void on_upload_ok(QueryId id, FileType file_type, const PartialRemoteFileLocation &remtoe, int64 size) = 0; virtual void on_upload_ok(QueryId id, FileType file_type, const PartialRemoteFileLocation &remtoe, int64 size) = 0;
@ -84,7 +85,7 @@ class FileLoadManager final : public Actor {
ActorOwn<ResourceManager> &get_download_resource_manager(bool is_small, DcId dc_id); ActorOwn<ResourceManager> &get_download_resource_manager(bool is_small, DcId dc_id);
void on_start_download(); void on_start_download();
void on_partial_download(const PartialLocalFileLocation &partial_local, int64 ready_size); void on_partial_download(const PartialLocalFileLocation &partial_local, int64 ready_size, int64 size);
void on_partial_upload(const PartialRemoteFileLocation &partial_remote, int64 ready_size); void on_partial_upload(const PartialRemoteFileLocation &partial_remote, int64 ready_size);
void on_hash(string hash); void on_hash(string hash);
void on_ok_download(const FullLocalFileLocation &local, int64 size); void on_ok_download(const FullLocalFileLocation &local, int64 size);
@ -104,8 +105,8 @@ class FileLoadManager final : public Actor {
void on_start_download() override { void on_start_download() override {
send_closure(actor_id_, &FileLoadManager::on_start_download); send_closure(actor_id_, &FileLoadManager::on_start_download);
} }
void on_partial_download(const PartialLocalFileLocation &partial_local, int64 ready_size) override { void on_partial_download(const PartialLocalFileLocation &partial_local, int64 ready_size, int64 size) override {
send_closure(actor_id_, &FileLoadManager::on_partial_download, partial_local, ready_size); send_closure(actor_id_, &FileLoadManager::on_partial_download, partial_local, ready_size, size);
} }
void on_ok(const FullLocalFileLocation &full_local, int64 size) override { void on_ok(const FullLocalFileLocation &full_local, int64 size) override {
send_closure(std::move(actor_id_), &FileLoadManager::on_ok_download, full_local, size); send_closure(std::move(actor_id_), &FileLoadManager::on_ok_download, full_local, size);

View File

@ -106,7 +106,7 @@ void FileLoader::start_up() {
} }
resource_state_.set_unit_size(parts_manager_.get_part_size()); resource_state_.set_unit_size(parts_manager_.get_part_size());
update_estimated_limit(); update_estimated_limit();
on_progress_impl(narrow_cast<size_t>(parts_manager_.get_ready_size())); on_progress_impl();
yield(); yield();
} }
@ -129,7 +129,7 @@ Status FileLoader::do_loop() {
check_loop(parts_manager_.get_checked_prefix_size(), parts_manager_.get_unchecked_ready_prefix_size(), check_loop(parts_manager_.get_checked_prefix_size(), parts_manager_.get_unchecked_ready_prefix_size(),
parts_manager_.unchecked_ready())); parts_manager_.unchecked_ready()));
if (check_info.changed) { if (check_info.changed) {
on_progress_impl(narrow_cast<size_t>(parts_manager_.get_ready_size())); on_progress_impl();
} }
for (auto &query : check_info.queries) { for (auto &query : check_info.queries) {
G()->net_query_dispatcher().dispatch_with_callback( G()->net_query_dispatcher().dispatch_with_callback(
@ -294,14 +294,20 @@ Status FileLoader::try_on_part_query(Part part, NetQueryPtr query) {
debug_bad_parts_.push_back(part.id); debug_bad_parts_.push_back(part.id);
debug_bad_part_order_++; debug_bad_part_order_++;
} }
on_progress_impl(size); on_progress_impl();
return Status::OK(); return Status::OK();
} }
void FileLoader::on_progress_impl(size_t size) { void FileLoader::on_progress_impl() {
on_progress(parts_manager_.get_part_count(), static_cast<int32>(parts_manager_.get_part_size()), Progress progress;
parts_manager_.get_ready_prefix_count(), parts_manager_.get_bitmask(), parts_manager_.ready(), progress.part_count = parts_manager_.get_part_count();
parts_manager_.get_ready_size()); progress.part_size = static_cast<int32>(parts_manager_.get_part_size());
progress.ready_part_count = parts_manager_.get_ready_prefix_count();
progress.ready_bitmask = parts_manager_.get_bitmask();
progress.is_ready = parts_manager_.ready();
progress.ready_size = parts_manager_.get_ready_size();
progress.size = parts_manager_.get_size_or_zero();
on_progress(std::move(progress));
} }
} // namespace td } // namespace td

View File

@ -70,8 +70,16 @@ class FileLoader : public FileLoaderActor {
virtual void after_start_parts() { virtual void after_start_parts() {
} }
virtual Result<size_t> process_part(Part part, NetQueryPtr net_query) TD_WARN_UNUSED_RESULT = 0; virtual Result<size_t> process_part(Part part, NetQueryPtr net_query) TD_WARN_UNUSED_RESULT = 0;
virtual void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, const string &ready_bitmask, struct Progress {
bool is_ready, int64 ready_size) = 0; int32 part_count{0};
int32 part_size{0};
int32 ready_part_count{0};
string ready_bitmask;
bool is_ready{false};
int64 ready_size{0};
int64 size{0};
};
virtual void on_progress(Progress progress) = 0;
virtual Callback *get_callback() = 0; virtual Callback *get_callback() = 0;
virtual Result<PrefixInfo> on_update_local_location(const LocalFileLocation &location, virtual Result<PrefixInfo> on_update_local_location(const LocalFileLocation &location,
int64 file_size) TD_WARN_UNUSED_RESULT { int64 file_size) TD_WARN_UNUSED_RESULT {
@ -121,7 +129,7 @@ class FileLoader : public FileLoaderActor {
void tear_down() override; void tear_down() override;
void update_estimated_limit(); void update_estimated_limit();
void on_progress_impl(size_t size); void on_progress_impl();
void on_result(NetQueryPtr query) override; void on_result(NetQueryPtr query) override;
void on_part_query(Part part, NetQueryPtr query); void on_part_query(Part part, NetQueryPtr query);

View File

@ -2271,8 +2271,8 @@ void FileManager::on_start_download(QueryId query_id) {
file_node->is_download_started_ = true; file_node->is_download_started_ = true;
} }
void FileManager::on_partial_download(QueryId query_id, const PartialLocalFileLocation &partial_local, void FileManager::on_partial_download(QueryId query_id, const PartialLocalFileLocation &partial_local, int64 ready_size,
int64 ready_size) { int64 size) {
if (is_closed_) { if (is_closed_) {
return; return;
} }
@ -2290,6 +2290,9 @@ void FileManager::on_partial_download(QueryId query_id, const PartialLocalFileLo
return; return;
} }
if (size != 0) {
file_node->set_size(size);
}
file_node->set_local_location(LocalFileLocation(partial_local), ready_size, -1, -1 /* TODO */); file_node->set_local_location(LocalFileLocation(partial_local), ready_size, -1, -1 /* TODO */);
try_flush_node(file_node); try_flush_node(file_node);
} }

View File

@ -495,7 +495,8 @@ class FileManager : public FileLoadManager::Callback {
void run_generate(FileNodePtr node); void run_generate(FileNodePtr node);
void on_start_download(QueryId query_id) override; void on_start_download(QueryId query_id) override;
void on_partial_download(QueryId query_id, const PartialLocalFileLocation &partial_local, int64 ready_size) override; void on_partial_download(QueryId query_id, const PartialLocalFileLocation &partial_local, int64 ready_size,
int64 size) override;
void on_hash(QueryId query_id, string hash) override; void on_hash(QueryId query_id, string hash) override;
void on_partial_upload(QueryId query_id, const PartialRemoteFileLocation &partial_remote, int64 ready_size) override; void on_partial_upload(QueryId query_id, const PartialRemoteFileLocation &partial_remote, int64 ready_size) override;
void on_download_ok(QueryId query_id, const FullLocalFileLocation &local, int64 size) override; void on_download_ok(QueryId query_id, const FullLocalFileLocation &local, int64 size) override;

View File

@ -255,7 +255,6 @@ Result<std::pair<NetQueryPtr, bool>> FileUploader::start_part(Part part, int32 p
} }
if (size != part.size) { if (size != part.size) {
LOG(ERROR) << "Need to read " << part.size << " bytes, but read " << size << " bytes instead";
return Status::Error("Failed to read file part"); return Status::Error("Failed to read file part");
} }
@ -295,13 +294,14 @@ Result<size_t> FileUploader::process_part(Part part, NetQueryPtr net_query) {
return part.size; return part.size;
} }
void FileUploader::on_progress(int32 part_count, int32 part_size, int32 ready_part_count, const string &ready_bitmask, void FileUploader::on_progress(Progress progress) {
bool is_ready, int64 ready_size) { callback_->on_partial_upload(PartialRemoteFileLocation{file_id_, progress.part_count, progress.part_size,
callback_->on_partial_upload(PartialRemoteFileLocation{file_id_, part_count, part_size, ready_part_count, big_flag_}, progress.ready_part_count, big_flag_},
ready_size); progress.ready_size);
if (is_ready) { if (progress.is_ready) {
callback_->on_ok(file_type_, callback_->on_ok(file_type_,
PartialRemoteFileLocation{file_id_, part_count, part_size, ready_part_count, big_flag_}, PartialRemoteFileLocation{file_id_, progress.part_count, progress.part_size,
progress.ready_part_count, big_flag_},
local_size_); local_size_);
} }
} }

View File

@ -63,8 +63,7 @@ class FileUploader : public FileLoader {
void after_start_parts() override; void after_start_parts() override;
Result<std::pair<NetQueryPtr, bool>> start_part(Part part, int32 part_count) override TD_WARN_UNUSED_RESULT; Result<std::pair<NetQueryPtr, bool>> start_part(Part part, int32 part_count) override TD_WARN_UNUSED_RESULT;
Result<size_t> process_part(Part part, NetQueryPtr net_query) override TD_WARN_UNUSED_RESULT; Result<size_t> process_part(Part part, NetQueryPtr net_query) override TD_WARN_UNUSED_RESULT;
void on_progress(int32 part_count, int32 part_size, int32 ready_part_count, const string &ready_bitmask, void on_progress(Progress progress) override;
bool is_ready, int64 ready_size) override;
FileLoader::Callback *get_callback() override; FileLoader::Callback *get_callback() override;
Result<PrefixInfo> on_update_local_location(const LocalFileLocation &location, Result<PrefixInfo> on_update_local_location(const LocalFileLocation &location,
int64 file_size) override TD_WARN_UNUSED_RESULT; int64 file_size) override TD_WARN_UNUSED_RESULT;