FileLoader: smarter cancelling of queries when offset changes

GitOrigin-RevId: 2131d416eda29d93d85e8d655964c0a92cdfaf57
This commit is contained in:
Arseny Smirnov 2020-08-21 15:53:11 +03:00
parent 2c4953386a
commit 4f41cfcbff
3 changed files with 23 additions and 7 deletions

View File

@ -67,10 +67,13 @@ void FileLoader::update_local_file_location(const LocalFileLocation &local) {
void FileLoader::update_download_offset(int64 offset) {
if (parts_manager_.get_streaming_offset() != offset) {
parts_manager_.set_streaming_offset(offset);
uint64 begin_part_id = parts_manager_.set_streaming_offset(offset);
uint64 end_part_id = begin_part_id + part_map_.size();
//TODO: cancel only some queries
for (auto &it : part_map_) {
it.second.second.reset(); // cancel_query(it.second.second);
if (!(begin_part_id <= it.first && it.first < end_part_id)) {
it.second.second.reset(); // cancel_query(it.second.second);
}
}
}
update_estimated_limit();
@ -259,6 +262,7 @@ void FileLoader::on_result(NetQueryPtr query) {
Part part = it->second.first;
it->second.second.release();
CHECK(query->is_ready());
part_map_.erase(it);
bool next = false;
auto status = [&] {

View File

@ -29,22 +29,27 @@ Status PartsManager::init_known_prefix(int64 known_prefix, size_t part_size, con
return init_no_size(part_size, ready_parts);
}
void PartsManager::set_streaming_offset(int64 offset) {
SCOPE_EXIT {
int32 PartsManager::set_streaming_offset(int64 offset) {
auto finish = [&] {
set_streaming_limit(streaming_limit_);
update_first_empty_part();
return first_streaming_empty_part_;
};
if (offset < 0 || need_check_ || (!unknown_size_flag_ && get_size() < offset)) {
streaming_offset_ = 0;
LOG_IF(ERROR, offset != 0) << "Ignore streaming_offset " << offset << ", need_check_ = " << need_check_
<< ", unknown_size_flag_ = " << unknown_size_flag_ << ", size = " << get_size();
return;
return finish();
}
auto part_i = offset / part_size_;
if (use_part_count_limit_ && part_i >= MAX_PART_COUNT) {
streaming_offset_ = 0;
LOG(ERROR) << "Ignore streaming_offset " << offset << " in part " << part_i;
return;
return finish();
}
streaming_offset_ = offset;
@ -54,6 +59,12 @@ void PartsManager::set_streaming_offset(int64 offset) {
part_count_ = first_streaming_empty_part_;
part_status_.resize(part_count_, PartStatus::Empty);
}
return finish();
}
int32 PartsManager::get_pending_count() {
return pending_count_;
}
void PartsManager::set_streaming_limit(int64 limit) {

View File

@ -35,7 +35,7 @@ class PartsManager {
Status set_known_prefix(size_t size, bool is_ready);
void set_need_check();
void set_checked_prefix_size(int64 size);
void set_streaming_offset(int64 offset);
int32 set_streaming_offset(int64 offset);
void set_streaming_limit(int64 limit);
int64 get_checked_prefix_size() const;
@ -51,6 +51,7 @@ class PartsManager {
int32 get_ready_prefix_count();
int64 get_streaming_offset() const;
string get_bitmask();
int32 get_pending_count();
private:
static constexpr int MAX_PART_COUNT = 4000;