FileManager: multiple fixes

GitOrigin-RevId: 2b3da1b87529f8a49e9e4da3e0cd490ae2790926
This commit is contained in:
Arseny Smirnov 2018-01-30 15:33:02 +03:00 committed by levlam
parent c329826c5d
commit 74bdbedb5a
6 changed files with 55 additions and 20 deletions

View File

@ -54,7 +54,12 @@ void FileLoader::update_local_file_location(const LocalFileLocation &local) {
return; return;
} }
auto prefix_info = r_prefix_info.move_as_ok(); auto prefix_info = r_prefix_info.move_as_ok();
parts_manager_.set_known_prefix(narrow_cast<size_t>(prefix_info.size), prefix_info.is_ready); auto status = parts_manager_.set_known_prefix(narrow_cast<size_t>(prefix_info.size), prefix_info.is_ready);
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
return;
}
loop(); loop();
} }
@ -67,11 +72,12 @@ void FileLoader::start_up() {
} }
auto file_info = r_file_info.ok(); auto file_info = r_file_info.ok();
auto size = file_info.size; auto size = file_info.size;
auto expected_size = std::max(size, file_info.expected_size);
bool is_size_final = file_info.is_size_final; bool is_size_final = file_info.is_size_final;
auto part_size = file_info.part_size; auto part_size = file_info.part_size;
auto &ready_parts = file_info.ready_parts; auto &ready_parts = file_info.ready_parts;
auto use_part_count_limit = file_info.use_part_count_limit; auto use_part_count_limit = file_info.use_part_count_limit;
auto status = parts_manager_.init(size, is_size_final, part_size, ready_parts, use_part_count_limit); auto status = parts_manager_.init(size, expected_size, is_size_final, part_size, ready_parts, use_part_count_limit);
if (status.is_error()) { if (status.is_error()) {
on_error(std::move(status)); on_error(std::move(status));
stop_flag_ = true; stop_flag_ = true;

View File

@ -48,6 +48,7 @@ class FileLoader : public FileLoaderActor {
}; };
struct FileInfo { struct FileInfo {
int64 size; int64 size;
int64 expected_size = 0;
bool is_size_final; bool is_size_final;
int32 part_size; int32 part_size;
std::vector<int> ready_parts; std::vector<int> ready_parts;

View File

@ -639,6 +639,9 @@ Result<FileId> FileManager::register_file(FileData data, FileLocationSource file
if (status.is_error()) { if (status.is_error()) {
LOG(WARNING) << "Invalid local location: " << status << " from " << source; LOG(WARNING) << "Invalid local location: " << status << " from " << source;
data.local_ = LocalFileLocation(); data.local_ = LocalFileLocation();
if (data.remote_.type() == RemoteFileLocation::Type::Partial) {
data.remote_ = {};
}
if (!has_remote && !has_generate) { if (!has_remote && !has_generate) {
return std::move(status); return std::move(status);
@ -2178,7 +2181,7 @@ void FileManager::on_error_impl(FileNode *node, FileManager::Query::Type type, b
<< ". File type is " << file_type_name[static_cast<int32>(FileView(node).get_type())]; << ". File type is " << file_type_name[static_cast<int32>(FileView(node).get_type())];
if (status.code() == 0) { if (status.code() == 0) {
// Remove partial locations // Remove partial locations
if (node->local_.type() == LocalFileLocation::Type::Partial) { if (node->local_.type() == LocalFileLocation::Type::Partial && status.message() != "FILE_UPLOAD_RESTART") {
LOG(INFO) << "Unlink file " << node->local_.partial().path_; LOG(INFO) << "Unlink file " << node->local_.partial().path_;
unlink(node->local_.partial().path_).ignore(); unlink(node->local_.partial().path_).ignore();
node->set_local_location(LocalFileLocation(), 0); node->set_local_location(LocalFileLocation(), 0);

View File

@ -223,7 +223,8 @@ Result<std::pair<NetQueryPtr, bool>> FileUploader::start_part(Part part, int32 p
NetQueryPtr net_query; NetQueryPtr net_query;
if (big_flag_) { if (big_flag_) {
auto query = telegram_api::upload_saveBigFilePart(file_id_, part.id, part_count, std::move(bytes)); auto query =
telegram_api::upload_saveBigFilePart(file_id_, part.id, local_is_ready_ ? part_count : -1, std::move(bytes));
net_query = G()->net_query_creator().create(create_storer(query), DcId::main(), NetQuery::Type::Upload, net_query = G()->net_query_creator().create(create_storer(query), DcId::main(), NetQuery::Type::Upload,
NetQuery::AuthFlag::On, NetQuery::GzipFlag::Off); NetQuery::AuthFlag::On, NetQuery::GzipFlag::Off);
} else { } else {

View File

@ -16,6 +16,14 @@
namespace td { namespace td {
/*** PartsManager ***/ /*** PartsManager ***/
namespace {
int64 calc_parts_count(int64 size, int64 part_size) {
CHECK(part_size != 0);
return (size + part_size - 1) / part_size;
}
} // namespace
Status PartsManager::init_known_prefix(int64 known_prefix, size_t part_size, const std::vector<int> &ready_parts) { Status PartsManager::init_known_prefix(int64 known_prefix, size_t part_size, const std::vector<int> &ready_parts) {
known_prefix_flag_ = true; known_prefix_flag_ = true;
known_prefix_size_ = known_prefix; known_prefix_size_ = known_prefix;
@ -32,6 +40,14 @@ Status PartsManager::init_no_size(size_t part_size, const std::vector<int> &read
part_size_ = part_size; part_size_ = part_size;
} else { } else {
part_size_ = 32 * (1 << 10); part_size_ = 32 * (1 << 10);
while (use_part_count_limit_ && calc_parts_count(expected_size_, part_size_) > MAX_PART_COUNT) {
part_size_ *= 2;
CHECK(part_size_ <= MAX_PART_SIZE);
}
// just in case if expected_size_ is wrong
if (part_size_ < MAX_PART_SIZE) {
part_size_ *= 2;
}
} }
part_count_ = 0; part_count_ = 0;
if (known_prefix_flag_) { if (known_prefix_flag_) {
@ -44,40 +60,42 @@ Status PartsManager::init_no_size(size_t part_size, const std::vector<int> &read
return Status::OK(); return Status::OK();
} }
Status PartsManager::init(int64 size, bool is_size_final, size_t part_size, const std::vector<int> &ready_parts, Status PartsManager::init(int64 size, int64 expected_size, bool is_size_final, size_t part_size,
bool use_part_count_limit) { const std::vector<int> &ready_parts, bool use_part_count_limit) {
CHECK(expected_size >= size);
use_part_count_limit_ = use_part_count_limit; use_part_count_limit_ = use_part_count_limit;
expected_size_ = expected_size;
if (expected_size_ > MAX_FILE_SIZE) {
return Status::Error("Too big file");
}
if (!is_size_final) { if (!is_size_final) {
return init_known_prefix(size, part_size, ready_parts); return init_known_prefix(size, part_size, ready_parts);
} }
if (size == 0) { if (size == 0) {
return init_no_size(part_size, ready_parts); return init_no_size(part_size, ready_parts);
} }
if (size > MAX_FILE_SIZE) {
return Status::Error("Too big file");
}
CHECK(size > 0) << tag("size", size); CHECK(size > 0) << tag("size", size);
unknown_size_flag_ = false; unknown_size_flag_ = false;
size_ = size; size_ = size;
if (part_size != 0) { if (part_size != 0) {
part_size_ = part_size; part_size_ = part_size;
if (use_part_count_limit_ && (size_ + part_size_ - 1) / part_size_ > MAX_PART_COUNT) { if (use_part_count_limit_ && calc_parts_count(expected_size_, part_size_) > MAX_PART_COUNT) {
return Status::Error("FILE_UPLOAD_RESTART"); return Status::Error("FILE_UPLOAD_RESTART");
} }
} else { } else {
// TODO choose part_size_ depending on size // TODO choose part_size_ depending on size
part_size_ = 64 * (1 << 10); part_size_ = 64 * (1 << 10);
while (use_part_count_limit && (size_ + part_size_ - 1) / part_size_ > MAX_PART_COUNT) { while (use_part_count_limit && calc_parts_count(expected_size_, part_size_) > MAX_PART_COUNT) {
part_size_ *= 2; part_size_ *= 2;
CHECK(part_size_ <= MAX_PART_SIZE); CHECK(part_size_ <= MAX_PART_SIZE);
} }
} }
CHECK(1 <= size_) << tag("size_", size_); CHECK(1 <= size_) << tag("size_", size_);
CHECK(!use_part_count_limit || (size_ + part_size_ - 1) / part_size_ <= MAX_PART_COUNT) CHECK(!use_part_count_limit || calc_parts_count(expected_size_, part_size_) <= MAX_PART_COUNT)
<< tag("size_", size_) << tag("is_size_final", is_size_final) << tag("part_size_", part_size_) << tag("size_", size_) << tag("expected_size", size_) << tag("is_size_final", is_size_final)
<< tag("ready_parts", ready_parts.size()); << tag("part_size_", part_size_) << tag("ready_parts", ready_parts.size());
part_count_ = static_cast<int>((size + part_size_ - 1) / part_size_); part_count_ = static_cast<int>(calc_parts_count(size_, part_size_));
init_common(ready_parts); init_common(ready_parts);
return Status::OK(); return Status::OK();
@ -147,14 +165,15 @@ Result<Part> PartsManager::start_part() {
return get_part(id); return get_part(id);
} }
void PartsManager::set_known_prefix(size_t size, bool is_ready) { Status PartsManager::set_known_prefix(size_t size, bool is_ready) {
CHECK(known_prefix_flag_); CHECK(known_prefix_flag_);
CHECK(size >= static_cast<size_t>(known_prefix_size_)); CHECK(size >= static_cast<size_t>(known_prefix_size_));
known_prefix_size_ = narrow_cast<int64>(size); known_prefix_size_ = narrow_cast<int64>(size);
expected_size_ = std::max(known_prefix_size_, expected_size_);
CHECK(static_cast<size_t>(part_count_) == part_status_.size()); CHECK(static_cast<size_t>(part_count_) == part_status_.size());
if (is_ready) { if (is_ready) {
part_count_ = static_cast<int>((size + part_size_ - 1) / part_size_); part_count_ = static_cast<int>(calc_parts_count(size, part_size_));
size_ = narrow_cast<int64>(size); size_ = narrow_cast<int64>(size);
unknown_size_flag_ = false; unknown_size_flag_ = false;
@ -164,6 +183,10 @@ void PartsManager::set_known_prefix(size_t size, bool is_ready) {
CHECK(static_cast<size_t>(part_count_) >= part_status_.size()) CHECK(static_cast<size_t>(part_count_) >= part_status_.size())
<< size << " " << is_ready << " " << part_count_ << " " << part_size_ << " " << part_status_.size(); << size << " " << is_ready << " " << part_count_ << " " << part_size_ << " " << part_status_.size();
part_status_.resize(part_count_); part_status_.resize(part_count_);
if (use_part_count_limit_ && calc_parts_count(expected_size_, part_size_) > MAX_PART_COUNT) {
return Status::Error("FILE_UPLOAD_RESTART");
}
return Status::OK();
} }
Status PartsManager::on_part_ok(int32 id, size_t part_size, size_t actual_size) { Status PartsManager::on_part_ok(int32 id, size_t part_size, size_t actual_size) {

View File

@ -20,8 +20,8 @@ struct Part {
class PartsManager { class PartsManager {
public: public:
Status init(int64 size, bool is_size_final, size_t part_size, const std::vector<int> &ready_parts, Status init(int64 size, int64 expected_size, bool is_size_final, size_t part_size,
bool use_part_count_limit) TD_WARN_UNUSED_RESULT; const std::vector<int> &ready_parts, bool use_part_count_limit) TD_WARN_UNUSED_RESULT;
bool ready(); bool ready();
bool unchecked_ready(); bool unchecked_ready();
Status finish() TD_WARN_UNUSED_RESULT; Status finish() TD_WARN_UNUSED_RESULT;
@ -30,7 +30,7 @@ class PartsManager {
Result<Part> start_part() TD_WARN_UNUSED_RESULT; Result<Part> start_part() TD_WARN_UNUSED_RESULT;
Status on_part_ok(int32 id, size_t part_size, size_t actual_size) TD_WARN_UNUSED_RESULT; Status on_part_ok(int32 id, size_t part_size, size_t actual_size) TD_WARN_UNUSED_RESULT;
void on_part_failed(int32 id); void on_part_failed(int32 id);
void set_known_prefix(size_t size, bool is_ready); Status set_known_prefix(size_t size, bool is_ready);
void set_need_check(); void set_need_check();
void set_checked_prefix_size(int64 size); void set_checked_prefix_size(int64 size);
@ -58,6 +58,7 @@ class PartsManager {
int64 known_prefix_size_; int64 known_prefix_size_;
int64 size_; int64 size_;
int64 expected_size_;
int64 min_size_; int64 min_size_;
int64 max_size_; int64 max_size_;
bool unknown_size_flag_; bool unknown_size_flag_;