PartsManager: fixes
GitOrigin-RevId: 7e4c663f53aa8b146cf9119ddf8e9fa680bb738b
This commit is contained in:
parent
e9ba66858d
commit
ffd7b166d9
@ -148,7 +148,7 @@ Status FileLoader::do_loop() {
|
||||
parts_manager_.set_checked_prefix_size(check_info.checked_prefix_size);
|
||||
}
|
||||
|
||||
if (parts_manager_.ready()) {
|
||||
if (parts_manager_.may_finish()) {
|
||||
TRY_STATUS(parts_manager_.finish());
|
||||
TRY_STATUS(on_ok(parts_manager_.get_size()));
|
||||
LOG(INFO) << "Bad download order rate: "
|
||||
|
@ -44,6 +44,7 @@ void PartsManager::set_streaming_offset(int64 offset) {
|
||||
|
||||
streaming_offset_ = offset;
|
||||
first_streaming_empty_part_ = narrow_cast<int>(part_i);
|
||||
first_streaming_not_ready_part_ = narrow_cast<int>(part_i);
|
||||
if (part_count_ < first_streaming_empty_part_) {
|
||||
part_count_ = first_streaming_empty_part_;
|
||||
part_status_.resize(part_count_, PartStatus::Empty);
|
||||
@ -126,11 +127,20 @@ bool PartsManager::unchecked_ready() {
|
||||
<< ", checked_prefix_size = " << checked_prefix_size_;
|
||||
return !unknown_size_flag_ && ready_size_ == size_;
|
||||
}
|
||||
bool PartsManager::may_finish() {
|
||||
if (is_streaming_limit_reached()) {
|
||||
return true;
|
||||
}
|
||||
return ready();
|
||||
}
|
||||
bool PartsManager::ready() {
|
||||
return unchecked_ready() && (!need_check_ || checked_prefix_size_ == size_);
|
||||
}
|
||||
|
||||
Status PartsManager::finish() {
|
||||
if (is_streaming_limit_reached()) {
|
||||
return Status::Error("FILE_DOWNLOAD_LIMIT");
|
||||
}
|
||||
if (!ready()) {
|
||||
return Status::Error("File transferring not finished");
|
||||
}
|
||||
@ -155,6 +165,14 @@ void PartsManager::update_first_not_ready_part() {
|
||||
while (first_not_ready_part_ < part_count_ && part_status_[first_not_ready_part_] == PartStatus::Ready) {
|
||||
first_not_ready_part_++;
|
||||
}
|
||||
if (streaming_offset_ == 0) {
|
||||
first_streaming_not_ready_part_ = first_not_ready_part_;
|
||||
return;
|
||||
}
|
||||
while (first_streaming_not_ready_part_ < part_count_ &&
|
||||
part_status_[first_streaming_not_ready_part_] == PartStatus::Ready) {
|
||||
first_streaming_not_ready_part_++;
|
||||
}
|
||||
}
|
||||
|
||||
int32 PartsManager::get_unchecked_ready_prefix_count() {
|
||||
@ -180,6 +198,48 @@ string PartsManager::get_bitmask() {
|
||||
return bitmask_.encode(prefix_count);
|
||||
}
|
||||
|
||||
bool PartsManager::is_part_in_streaming_limit(int part_i) const {
|
||||
auto offset_begin = static_cast<int64>(part_i * get_part_size());
|
||||
auto offset_end = static_cast<int64>(offset_begin + get_part(part_i).size);
|
||||
|
||||
if (offset_begin >= get_expected_size()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (streaming_limit_ == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto is_intersect_with = [&](int64 begin, int64 end) {
|
||||
return std::max(begin, offset_begin) < std::min(end, offset_end);
|
||||
};
|
||||
|
||||
auto streaming_begin = streaming_offset_;
|
||||
auto streaming_end = streaming_offset_ + streaming_limit_;
|
||||
if (is_intersect_with(streaming_begin, streaming_end)) {
|
||||
return true;
|
||||
}
|
||||
// wrap limit
|
||||
if (!unknown_size_flag_ && streaming_end > get_size() && is_intersect_with(0, streaming_end - get_size())) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PartsManager::is_streaming_limit_reached() {
|
||||
if (streaming_limit_ == 0) {
|
||||
return false;
|
||||
}
|
||||
update_first_not_ready_part();
|
||||
auto part_i = first_streaming_not_ready_part_;
|
||||
|
||||
// wrap
|
||||
if (!unknown_size_flag_ && part_i == part_count_) {
|
||||
part_i = first_not_ready_part_;
|
||||
}
|
||||
return !is_part_in_streaming_limit(part_i);
|
||||
}
|
||||
|
||||
Result<Part> PartsManager::start_part() {
|
||||
update_first_empty_part();
|
||||
auto part_i = first_streaming_empty_part_;
|
||||
@ -202,26 +262,9 @@ Result<Part> PartsManager::start_part() {
|
||||
}
|
||||
}
|
||||
|
||||
if (streaming_limit_ != 0) {
|
||||
auto offset = static_cast<int64>(part_i * get_part_size());
|
||||
int64 ready = 0;
|
||||
if (part_i == first_streaming_empty_part_) {
|
||||
if (offset > streaming_offset_) {
|
||||
ready += offset - streaming_offset_;
|
||||
if (!is_part_in_streaming_limit(part_i)) {
|
||||
return get_empty_part();
|
||||
}
|
||||
} else {
|
||||
ready += offset;
|
||||
CHECK(!unknown_size_flag_);
|
||||
if (streaming_offset_ < size_) {
|
||||
ready += size_ - streaming_offset_;
|
||||
}
|
||||
}
|
||||
|
||||
if (ready >= streaming_limit_) {
|
||||
return Status::Error("FILE_DOWNLOAD_LIMIT");
|
||||
}
|
||||
}
|
||||
|
||||
CHECK(part_status_[part_i] == PartStatus::Empty);
|
||||
on_part_start(part_i);
|
||||
return get_part(part_i);
|
||||
@ -323,7 +366,38 @@ int64 PartsManager::get_size_or_zero() const {
|
||||
int64 PartsManager::get_estimated_extra() const {
|
||||
auto total_estimated_extra = get_expected_size() - get_ready_size();
|
||||
if (streaming_limit_ != 0) {
|
||||
return td::min(streaming_limit_, total_estimated_extra);
|
||||
int64 expected_size = get_expected_size();
|
||||
int64 streaming_begin = streaming_offset_ / get_part_size() * get_part_size();
|
||||
int64 streaming_end =
|
||||
(streaming_offset_ + streaming_limit_ + get_part_size() - 1) / get_part_size() * get_part_size();
|
||||
int64 streaming_size = streaming_end - streaming_begin;
|
||||
if (unknown_size_flag_) {
|
||||
if (streaming_begin < expected_size) {
|
||||
streaming_size = min(expected_size - streaming_begin, streaming_size);
|
||||
} else {
|
||||
streaming_size = 0;
|
||||
}
|
||||
} else {
|
||||
if (streaming_end > expected_size) {
|
||||
int64 total = streaming_limit_;
|
||||
int64 suffix = 0;
|
||||
if (streaming_offset_ < expected_size_) {
|
||||
suffix = expected_size_ - streaming_begin;
|
||||
total -= expected_size_ - streaming_offset_;
|
||||
}
|
||||
int64 prefix = (total + get_part_size() - 1) / get_part_size() * get_part_size();
|
||||
streaming_size = min(expected_size, prefix + suffix);
|
||||
}
|
||||
}
|
||||
//TODO: optimize
|
||||
int64 res = streaming_size;
|
||||
for (int part_i = 0; part_i < part_count_; part_i++) {
|
||||
if (is_part_in_streaming_limit(part_i) && part_status_[part_i] == PartStatus::Ready) {
|
||||
res -= get_part(part_i).size;
|
||||
}
|
||||
}
|
||||
CHECK(res >= 0);
|
||||
return res;
|
||||
}
|
||||
return total_estimated_extra;
|
||||
}
|
||||
@ -392,7 +466,7 @@ int64 PartsManager::get_unchecked_ready_prefix_size() {
|
||||
return res;
|
||||
}
|
||||
|
||||
Part PartsManager::get_part(int id) {
|
||||
Part PartsManager::get_part(int id) const {
|
||||
int64 offset = narrow_cast<int64>(part_size_) * id;
|
||||
int64 size = narrow_cast<int64>(part_size_);
|
||||
if (!unknown_size_flag_) {
|
||||
|
@ -24,6 +24,7 @@ class PartsManager {
|
||||
public:
|
||||
Status init(int64 size, int64 expected_size, bool is_size_final, size_t part_size,
|
||||
const std::vector<int> &ready_parts, bool use_part_count_limit) TD_WARN_UNUSED_RESULT;
|
||||
bool may_finish();
|
||||
bool ready();
|
||||
bool unchecked_ready();
|
||||
Status finish() TD_WARN_UNUSED_RESULT;
|
||||
@ -79,6 +80,7 @@ class PartsManager {
|
||||
int64 streaming_offset_{0};
|
||||
int64 streaming_limit_{0};
|
||||
int first_streaming_empty_part_;
|
||||
int first_streaming_not_ready_part_;
|
||||
vector<PartStatus> part_status_;
|
||||
Bitmask bitmask_;
|
||||
bool use_part_count_limit_;
|
||||
@ -88,11 +90,14 @@ class PartsManager {
|
||||
const std::vector<int> &ready_parts) TD_WARN_UNUSED_RESULT;
|
||||
Status init_no_size(size_t part_size, const std::vector<int> &ready_parts) TD_WARN_UNUSED_RESULT;
|
||||
|
||||
Part get_part(int id);
|
||||
Part get_part(int id) const;
|
||||
Part get_empty_part();
|
||||
void on_part_start(int32 id);
|
||||
void update_first_empty_part();
|
||||
void update_first_not_ready_part();
|
||||
|
||||
bool is_streaming_limit_reached();
|
||||
bool is_part_in_streaming_limit(int part_i) const;
|
||||
};
|
||||
|
||||
} // namespace td
|
||||
|
Reference in New Issue
Block a user