PartsManager: restart upload when uploaded size is more than we have locally
GitOrigin-RevId: e307dbd150e97b04c73a9d40556dd26d9e6e4a00
This commit is contained in:
parent
29b807d660
commit
4d33451758
@ -92,8 +92,7 @@ Status PartsManager::init_no_size(size_t part_size, const std::vector<int> &read
|
|||||||
part_count_ =
|
part_count_ =
|
||||||
std::accumulate(ready_parts.begin(), ready_parts.end(), 0, [](auto a, auto b) { return max(a, b + 1); });
|
std::accumulate(ready_parts.begin(), ready_parts.end(), 0, [](auto a, auto b) { return max(a, b + 1); });
|
||||||
|
|
||||||
init_common(ready_parts);
|
return init_common(ready_parts);
|
||||||
return Status::OK();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Status PartsManager::init(int64 size, int64 expected_size, bool is_size_final, size_t part_size,
|
Status PartsManager::init(int64 size, int64 expected_size, bool is_size_final, size_t part_size,
|
||||||
@ -133,8 +132,7 @@ Status PartsManager::init(int64 size, int64 expected_size, bool is_size_final, s
|
|||||||
<< tag("part_size_", part_size_) << tag("ready_parts", ready_parts.size());
|
<< tag("part_size_", part_size_) << tag("ready_parts", ready_parts.size());
|
||||||
part_count_ = static_cast<int>(calc_part_count(size_, part_size_));
|
part_count_ = static_cast<int>(calc_part_count(size_, part_size_));
|
||||||
|
|
||||||
init_common(ready_parts);
|
return init_common(ready_parts);
|
||||||
return Status::OK();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool PartsManager::unchecked_ready() {
|
bool PartsManager::unchecked_ready() {
|
||||||
@ -304,6 +302,7 @@ Status PartsManager::set_known_prefix(size_t size, bool is_ready) {
|
|||||||
} else {
|
} else {
|
||||||
part_count_ = static_cast<int>(size / part_size_);
|
part_count_ = static_cast<int>(size / part_size_);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_CHECK(static_cast<size_t>(part_count_) >= part_status_.size())
|
LOG_CHECK(static_cast<size_t>(part_count_) >= part_status_.size())
|
||||||
<< size << " " << is_ready << " " << part_count_ << " " << part_size_ << " " << part_status_.size();
|
<< size << " " << is_ready << " " << part_count_ << " " << part_size_ << " " << part_status_.size();
|
||||||
part_status_.resize(part_count_);
|
part_status_.resize(part_count_);
|
||||||
@ -446,7 +445,7 @@ int32 PartsManager::get_part_count() const {
|
|||||||
return part_count_;
|
return part_count_;
|
||||||
}
|
}
|
||||||
|
|
||||||
void PartsManager::init_common(const std::vector<int> &ready_parts) {
|
Status PartsManager::init_common(const std::vector<int> &ready_parts) {
|
||||||
ready_size_ = 0;
|
ready_size_ = 0;
|
||||||
streaming_ready_size_ = 0;
|
streaming_ready_size_ = 0;
|
||||||
pending_count_ = 0;
|
pending_count_ = 0;
|
||||||
@ -457,9 +456,13 @@ void PartsManager::init_common(const std::vector<int> &ready_parts) {
|
|||||||
for (auto i : ready_parts) {
|
for (auto i : ready_parts) {
|
||||||
LOG_CHECK(0 <= i && i < part_count_) << tag("i", i) << tag("part_count", part_count_) << tag("size", size_)
|
LOG_CHECK(0 <= i && i < part_count_) << tag("i", i) << tag("part_count", part_count_) << tag("size", size_)
|
||||||
<< tag("part_size", part_size_) << tag("known_prefix_flag", known_prefix_flag_)
|
<< tag("part_size", part_size_) << tag("known_prefix_flag", known_prefix_flag_)
|
||||||
|
<< tag("known_prefix_size", known_prefix_size_)
|
||||||
<< tag("real part_count",
|
<< tag("real part_count",
|
||||||
std::accumulate(ready_parts.begin(), ready_parts.end(), 0,
|
std::accumulate(ready_parts.begin(), ready_parts.end(), 0,
|
||||||
[](auto a, auto b) { return max(a, b + 1); }));
|
[](auto a, auto b) { return max(a, b + 1); }));
|
||||||
|
if (known_prefix_flag_ && i >= static_cast<int>(known_prefix_size_ / part_size_)) {
|
||||||
|
return Status::Error("FILE_UPLOAD_RESTART");
|
||||||
|
}
|
||||||
part_status_[i] = PartStatus::Ready;
|
part_status_[i] = PartStatus::Ready;
|
||||||
bitmask_.set(i);
|
bitmask_.set(i);
|
||||||
auto part = get_part(i);
|
auto part = get_part(i);
|
||||||
@ -467,6 +470,8 @@ void PartsManager::init_common(const std::vector<int> &ready_parts) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
checked_prefix_size_ = get_ready_prefix_count() * narrow_cast<int64>(part_size_);
|
checked_prefix_size_ = get_ready_prefix_count() * narrow_cast<int64>(part_size_);
|
||||||
|
|
||||||
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
void PartsManager::set_need_check() {
|
void PartsManager::set_need_check() {
|
||||||
|
@ -87,7 +87,7 @@ class PartsManager {
|
|||||||
Bitmask bitmask_;
|
Bitmask bitmask_;
|
||||||
bool use_part_count_limit_;
|
bool use_part_count_limit_;
|
||||||
|
|
||||||
void init_common(const vector<int> &ready_parts);
|
Status init_common(const vector<int> &ready_parts);
|
||||||
Status init_known_prefix(int64 known_prefix, size_t part_size,
|
Status init_known_prefix(int64 known_prefix, size_t part_size,
|
||||||
const std::vector<int> &ready_parts) TD_WARN_UNUSED_RESULT;
|
const std::vector<int> &ready_parts) TD_WARN_UNUSED_RESULT;
|
||||||
Status init_no_size(size_t part_size, const std::vector<int> &ready_parts) TD_WARN_UNUSED_RESULT;
|
Status init_no_size(size_t part_size, const std::vector<int> &ready_parts) TD_WARN_UNUSED_RESULT;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user