// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/telegram/files/FileLoader.h" #include "td/telegram/files/ResourceManager.h" #include "td/telegram/Global.h" #include "td/telegram/net/NetQueryDispatcher.h" #include "td/telegram/UniqueId.h" #include "td/utils/format.h" #include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/ScopeGuard.h" #include #include namespace td { void FileLoader::set_resource_manager(ActorShared resource_manager) { resource_manager_ = std::move(resource_manager); send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_); } void FileLoader::update_priority(int8 priority) { send_closure(resource_manager_, &ResourceManager::update_priority, priority); } void FileLoader::update_resources(const ResourceState &other) { resource_state_.update_slave(other); VLOG(files) << "update resources " << resource_state_; loop(); } void FileLoader::set_ordered_flag(bool flag) { ordered_flag_ = flag; } size_t FileLoader::get_part_size() const { return parts_manager_.get_part_size(); } void FileLoader::hangup() { // if (!stop_flag_) { // stop_flag_ = true; // on_error(Status::Error("Cancelled")); //} stop(); } void FileLoader::update_local_file_location(const LocalFileLocation &local) { auto r_prefix_info = on_update_local_location(local, parts_manager_.get_size_or_zero()); if (r_prefix_info.is_error()) { on_error(r_prefix_info.move_as_error()); stop_flag_ = true; return; } auto prefix_info = r_prefix_info.move_as_ok(); auto status = parts_manager_.set_known_prefix(narrow_cast(prefix_info.size), prefix_info.is_ready); if (status.is_error()) { on_error(std::move(status)); stop_flag_ = true; return; } loop(); } void FileLoader::update_download_offset(int64 offset) { parts_manager_.set_streaming_offset(offset); //TODO: cancel only some queries for (auto &it : part_map_) { it.second.second.reset(); // cancel_query(it.second.second); } loop(); } void FileLoader::start_up() { auto r_file_info = init(); if (r_file_info.is_error()) { on_error(r_file_info.move_as_error()); stop_flag_ = true; return; } auto file_info = r_file_info.ok(); auto size = file_info.size; auto expected_size = max(size, file_info.expected_size); bool is_size_final = file_info.is_size_final; auto part_size = file_info.part_size; auto &ready_parts = file_info.ready_parts; auto use_part_count_limit = file_info.use_part_count_limit; auto status = parts_manager_.init(size, expected_size, is_size_final, part_size, ready_parts, use_part_count_limit); if (status.is_error()) { on_error(std::move(status)); stop_flag_ = true; return; } if (file_info.only_check) { parts_manager_.set_checked_prefix_size(0); } parts_manager_.set_streaming_offset(file_info.offset); if (ordered_flag_) { ordered_parts_ = OrderedEventsProcessor>(parts_manager_.get_ready_prefix_count()); } if (file_info.need_delay) { delay_dispatcher_ = create_actor("DelayDispatcher", 0.003); next_delay_ = 0.05; } resource_state_.set_unit_size(parts_manager_.get_part_size()); update_estimated_limit(); on_progress_impl(narrow_cast(parts_manager_.get_ready_size())); yield(); } void FileLoader::loop() { if (stop_flag_) { return; } auto status = do_loop(); if (status.is_error()) { if (status.code() == 1) { return; } on_error(std::move(status)); stop_flag_ = true; return; } } Status FileLoader::do_loop() { TRY_RESULT(check_info, check_loop(parts_manager_.get_checked_prefix_size(), parts_manager_.get_unchecked_ready_prefix_size(), parts_manager_.unchecked_ready())); if (check_info.changed) { on_progress_impl(narrow_cast(parts_manager_.get_ready_size())); } for (auto &query : check_info.queries) { G()->net_query_dispatcher().dispatch_with_callback( std::move(query), actor_shared(this, UniqueId::next(UniqueId::Type::Default, COMMON_QUERY_KEY))); } if (check_info.need_check) { parts_manager_.set_need_check(); parts_manager_.set_checked_prefix_size(check_info.checked_prefix_size); } if (parts_manager_.ready()) { TRY_STATUS(parts_manager_.finish()); TRY_STATUS(on_ok(parts_manager_.get_size())); LOG(INFO) << "Bad download order rate: " << (debug_total_parts_ == 0 ? 0.0 : 100.0 * debug_bad_part_order_ / debug_total_parts_) << "% " << debug_bad_part_order_ << "/" << debug_total_parts_ << " " << format::as_array(debug_bad_parts_); stop_flag_ = true; return Status::OK(); } TRY_STATUS(before_start_parts()); SCOPE_EXIT { after_start_parts(); }; while (true) { if (blocking_id_ != 0) { break; } if (resource_state_.unused() < static_cast(parts_manager_.get_part_size())) { VLOG(files) << "Got only " << resource_state_.unused() << " resource"; break; } TRY_RESULT(part, parts_manager_.start_part()); if (part.size == 0) { break; } VLOG(files) << "Start part " << tag("id", part.id) << tag("size", part.size); resource_state_.start_use(static_cast(part.size)); TRY_RESULT(query_flag, start_part(part, parts_manager_.get_part_count())); NetQueryPtr query; bool is_blocking; std::tie(query, is_blocking) = std::move(query_flag); uint64 id = UniqueId::next(); if (is_blocking) { CHECK(blocking_id_ == 0); blocking_id_ = id; } part_map_[id] = std::make_pair(part, query->cancel_slot_.get_signal_new()); // part_map_[id] = std::make_pair(part, query.get_weak()); auto callback = actor_shared(this, id); if (delay_dispatcher_.empty()) { G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback)); } else { send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback_and_delay, std::move(query), std::move(callback), next_delay_); next_delay_ = max(next_delay_ * 0.8, 0.003); } } return Status::OK(); } void FileLoader::tear_down() { for (auto &it : part_map_) { it.second.second.reset(); // cancel_query(it.second.second); } ordered_parts_.clear([](auto &&part) { part.second->clear(); }); send_closure(std::move(delay_dispatcher_), &DelayDispatcher::close_silent); } void FileLoader::update_estimated_limit() { if (stop_flag_) { return; } auto estimated_exta = parts_manager_.get_expected_size() - parts_manager_.get_ready_size(); resource_state_.update_estimated_limit(estimated_exta); VLOG(files) << "update estimated limit " << estimated_exta; if (!resource_manager_.empty()) { keep_fd_flag(narrow_cast(resource_state_.active_limit()) >= parts_manager_.get_part_size()); send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_); } } void FileLoader::on_result(NetQueryPtr query) { if (stop_flag_) { return; } auto id = get_link_token(); if (id == blocking_id_) { blocking_id_ = 0; } if (UniqueId::extract_key(id) == COMMON_QUERY_KEY) { on_common_query(std::move(query)); return loop(); } auto it = part_map_.find(id); if (it == part_map_.end()) { LOG(WARNING) << "Got result for unknown part"; return; } Part part = it->second.first; it->second.second.release(); CHECK(query->is_ready()); bool next = false; auto status = [&] { TRY_RESULT(should_restart, should_restart_part(part, query)); if (should_restart) { VLOG(files) << "Restart part " << tag("id", part.id) << tag("size", part.size); resource_state_.stop_use(static_cast(part.size)); parts_manager_.on_part_failed(part.id); } else { next = true; } return Status::OK(); }(); if (status.is_error()) { on_error(std::move(status)); stop_flag_ = true; return; } if (next) { if (ordered_flag_) { auto seq_no = part.id; ordered_parts_.add(seq_no, std::make_pair(part, std::move(query)), [this](auto seq_no, auto &&p) { this->on_part_query(p.first, std::move(p.second)); }); } else { on_part_query(part, std::move(query)); } } update_estimated_limit(); loop(); } void FileLoader::on_part_query(Part part, NetQueryPtr query) { auto status = try_on_part_query(part, std::move(query)); if (status.is_error()) { on_error(std::move(status)); stop_flag_ = true; } } void FileLoader::on_common_query(NetQueryPtr query) { auto status = process_check_query(std::move(query)); if (status.is_error()) { on_error(std::move(status)); stop_flag_ = true; } } Status FileLoader::try_on_part_query(Part part, NetQueryPtr query) { TRY_RESULT(size, process_part(part, std::move(query))); VLOG(files) << "Ok part " << tag("id", part.id) << tag("size", part.size); resource_state_.stop_use(static_cast(part.size)); auto old_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count(); TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size)); auto new_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count(); debug_total_parts_++; if (old_ready_prefix_count == new_ready_prefix_count) { debug_bad_parts_.push_back(part.id); debug_bad_part_order_++; } on_progress_impl(size); return Status::OK(); } void FileLoader::on_progress_impl(size_t size) { on_progress(parts_manager_.get_part_count(), static_cast(parts_manager_.get_part_size()), parts_manager_.get_ready_prefix_count(), parts_manager_.get_bitmask(), parts_manager_.ready(), parts_manager_.get_ready_size()); } } // namespace td