Change offset and limit simultaneously.

GitOrigin-RevId: eb07ec70a9048c4b5a5a5e0f1e17431b8528e6de
This commit is contained in:
levlam 2020-08-25 18:58:37 +03:00
parent 3cc97684ec
commit 093ba9c9d4
8 changed files with 33 additions and 44 deletions

View File

@ -158,7 +158,8 @@ void FileLoadManager::update_local_file_location(QueryId id, const LocalFileLoca
} }
send_closure(node->loader_, &FileLoaderActor::update_local_file_location, local); send_closure(node->loader_, &FileLoaderActor::update_local_file_location, local);
} }
void FileLoadManager::update_download_offset(QueryId id, int64 offset) {
void FileLoadManager::update_downloaded_part(QueryId id, int64 offset, int64 limit) {
if (stop_flag_) { if (stop_flag_) {
return; return;
} }
@ -170,22 +171,9 @@ void FileLoadManager::update_download_offset(QueryId id, int64 offset) {
if (node == nullptr) { if (node == nullptr) {
return; return;
} }
send_closure(node->loader_, &FileLoaderActor::update_download_offset, offset); send_closure(node->loader_, &FileLoaderActor::update_downloaded_part, offset, limit);
}
void FileLoadManager::update_download_limit(QueryId id, int64 limit) {
if (stop_flag_) {
return;
}
auto it = query_id_to_node_id_.find(id);
if (it == query_id_to_node_id_.end()) {
return;
}
auto node = nodes_container_.get(it->second);
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_download_limit, limit);
} }
void FileLoadManager::hangup() { void FileLoadManager::hangup() {
nodes_container_.for_each([](auto id, auto &node) { node.loader_.reset(); }); nodes_container_.for_each([](auto id, auto &node) { node.loader_.reset(); });
stop_flag_ = true; stop_flag_ = true;

View File

@ -58,8 +58,7 @@ class FileLoadManager final : public Actor {
void from_bytes(QueryId id, FileType type, BufferSlice bytes, string name); void from_bytes(QueryId id, FileType type, BufferSlice bytes, string name);
void cancel(QueryId id); void cancel(QueryId id);
void update_local_file_location(QueryId id, const LocalFileLocation &local); void update_local_file_location(QueryId id, const LocalFileLocation &local);
void update_download_offset(QueryId id, int64 offset); void update_downloaded_part(QueryId id, int64 offset, int64 limit);
void update_download_limit(QueryId id, int64 limit);
void get_content(const FullLocalFileLocation &local_location, Promise<BufferSlice> promise); void get_content(const FullLocalFileLocation &local_location, Promise<BufferSlice> promise);
private: private:

View File

@ -65,9 +65,9 @@ void FileLoader::update_local_file_location(const LocalFileLocation &local) {
loop(); loop();
} }
void FileLoader::update_download_offset(int64 offset) { void FileLoader::update_downloaded_part(int64 offset, int64 limit) {
if (parts_manager_.get_streaming_offset() != offset) { if (parts_manager_.get_streaming_offset() != offset) {
auto begin_part_id = parts_manager_.set_streaming_offset(offset); auto begin_part_id = parts_manager_.set_streaming_offset(offset, limit);
auto end_part_id = begin_part_id + static_cast<int32>(part_map_.size()) * 2; auto end_part_id = begin_part_id + static_cast<int32>(part_map_.size()) * 2;
VLOG(files) << "Protect parts " << begin_part_id << " ... " << end_part_id; VLOG(files) << "Protect parts " << begin_part_id << " ... " << end_part_id;
for (auto &it : part_map_) { for (auto &it : part_map_) {
@ -76,17 +76,13 @@ void FileLoader::update_download_offset(int64 offset) {
it.second.second.reset(); // cancel_query(it.second.second); it.second.second.reset(); // cancel_query(it.second.second);
} }
} }
} else {
parts_manager_.set_streaming_limit(limit);
} }
update_estimated_limit(); update_estimated_limit();
loop(); loop();
} }
void FileLoader::update_download_limit(int64 limit) {
parts_manager_.set_streaming_limit(limit);
update_estimated_limit();
loop();
}
void FileLoader::start_up() { void FileLoader::start_up() {
auto r_file_info = init(); auto r_file_info = init();
if (r_file_info.is_error()) { if (r_file_info.is_error()) {
@ -124,8 +120,7 @@ void FileLoader::start_up() {
if (file_info.only_check) { if (file_info.only_check) {
parts_manager_.set_checked_prefix_size(0); parts_manager_.set_checked_prefix_size(0);
} }
parts_manager_.set_streaming_offset(file_info.offset); parts_manager_.set_streaming_offset(file_info.offset, file_info.limit);
parts_manager_.set_streaming_limit(file_info.limit);
if (ordered_flag_) { if (ordered_flag_) {
ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count()); ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count());
} }

View File

@ -39,8 +39,7 @@ class FileLoader : public FileLoaderActor {
void update_resources(const ResourceState &other) override; void update_resources(const ResourceState &other) override;
void update_local_file_location(const LocalFileLocation &local) override; void update_local_file_location(const LocalFileLocation &local) override;
void update_download_offset(int64 offset) override; void update_downloaded_part(int64 offset, int64 limit) override;
void update_download_limit(int64 limit) override;
protected: protected:
void set_ordered_flag(bool flag); void set_ordered_flag(bool flag);

View File

@ -26,9 +26,7 @@ class FileLoaderActor : public NetQueryCallback {
// TODO: existence of these three functions is a dirty hack. Refactoring is highly appreciated // TODO: existence of these three functions is a dirty hack. Refactoring is highly appreciated
virtual void update_local_file_location(const LocalFileLocation &local) { virtual void update_local_file_location(const LocalFileLocation &local) {
} }
virtual void update_download_offset(int64 offset) { virtual void update_downloaded_part(int64 offset, int64 limit) {
}
virtual void update_download_limit(int64 limit) {
} }
}; };

View File

@ -2231,13 +2231,17 @@ void FileManager::run_download(FileNodePtr node) {
LOG(INFO) << "Update download offset and limits of file " << node->main_file_id_; LOG(INFO) << "Update download offset and limits of file " << node->main_file_id_;
CHECK(node->download_id_ != 0); CHECK(node->download_id_ != 0);
send_closure(file_load_manager_, &FileLoadManager::update_priority, node->download_id_, priority); send_closure(file_load_manager_, &FileLoadManager::update_priority, node->download_id_, priority);
if (need_update_limit) { if (need_update_limit || need_update_offset) {
auto download_offset = node->download_offset_;
auto download_limit = node->download_limit_; auto download_limit = node->download_limit_;
send_closure(file_load_manager_, &FileLoadManager::update_download_limit, node->download_id_, download_limit); if (file_view.is_encrypted_any()) {
} CHECK(download_offset <= MAX_FILE_SIZE);
if (need_update_offset) { CHECK(download_limit <= std::numeric_limits<int32>::max());
auto download_offset = file_view.is_encrypted_any() ? 0 : node->download_offset_; download_limit += download_offset;
send_closure(file_load_manager_, &FileLoadManager::update_download_offset, node->download_id_, download_offset); download_offset = 0;
}
send_closure(file_load_manager_, &FileLoadManager::update_downloaded_part, node->download_id_, download_offset,
download_limit);
} }
return; return;
} }
@ -2297,8 +2301,14 @@ void FileManager::run_download(FileNodePtr node) {
LOG(INFO) << "Run download of file " << file_id << " of size " << node->size_ << " from " LOG(INFO) << "Run download of file " << file_id << " of size " << node->size_ << " from "
<< node->remote_.full.value() << " with suggested name " << node->suggested_name() << " and encyption key " << node->remote_.full.value() << " with suggested name " << node->suggested_name() << " and encyption key "
<< node->encryption_key_; << node->encryption_key_;
auto download_offset = file_view.is_encrypted_any() ? 0 : node->download_offset_; auto download_offset = node->download_offset_;
auto download_limit = node->download_limit_; auto download_limit = node->download_limit_;
if (file_view.is_encrypted_any()) {
CHECK(download_offset <= MAX_FILE_SIZE);
CHECK(download_limit <= std::numeric_limits<int32>::max());
download_limit += download_offset;
download_offset = 0;
}
send_closure(file_load_manager_, &FileLoadManager::download, id, node->remote_.full.value(), node->local_, send_closure(file_load_manager_, &FileLoadManager::download, id, node->remote_.full.value(), node->local_,
node->size_, node->suggested_name(), node->encryption_key_, node->can_search_locally_, download_offset, node->size_, node->suggested_name(), node->encryption_key_, node->can_search_locally_, download_offset,
download_limit, priority); download_limit, priority);

View File

@ -29,9 +29,9 @@ Status PartsManager::init_known_prefix(int64 known_prefix, size_t part_size, con
return init_no_size(part_size, ready_parts); return init_no_size(part_size, ready_parts);
} }
int32 PartsManager::set_streaming_offset(int64 offset) { int32 PartsManager::set_streaming_offset(int64 offset, int64 limit) {
auto finish = [&] { auto finish = [&] {
set_streaming_limit(streaming_limit_); set_streaming_limit(limit);
update_first_not_ready_part(); update_first_not_ready_part();
return first_streaming_not_ready_part_; return first_streaming_not_ready_part_;
}; };
@ -502,7 +502,7 @@ Status PartsManager::init_common(const std::vector<int> &ready_parts) {
void PartsManager::set_need_check() { void PartsManager::set_need_check() {
need_check_ = true; need_check_ = true;
set_streaming_offset(0); set_streaming_offset(0, 0);
} }
void PartsManager::set_checked_prefix_size(int64 size) { void PartsManager::set_checked_prefix_size(int64 size) {

View File

@ -35,7 +35,7 @@ class PartsManager {
Status set_known_prefix(size_t size, bool is_ready); Status set_known_prefix(size_t size, bool is_ready);
void set_need_check(); void set_need_check();
void set_checked_prefix_size(int64 size); void set_checked_prefix_size(int64 size);
int32 set_streaming_offset(int64 offset); int32 set_streaming_offset(int64 offset, int64 limit);
void set_streaming_limit(int64 limit); void set_streaming_limit(int64 limit);
int64 get_checked_prefix_size() const; int64 get_checked_prefix_size() const;