Separate Download/Generate/Upload queries.

This commit is contained in:
levlam 2024-07-24 16:10:13 +03:00
parent 016663d807
commit 2760db1fe4
2 changed files with 140 additions and 97 deletions

View File

@ -84,24 +84,30 @@ StringBuilder &operator<<(StringBuilder &string_builder, const NewRemoteFileLoca
return string_builder << " from " << location.full_source;
}
StringBuilder &operator<<(StringBuilder &string_builder, FileManager::Query::Type type) {
StringBuilder &operator<<(StringBuilder &string_builder, FileManager::DownloadQuery::Type type) {
switch (type) {
case FileManager::Query::Type::UploadByHash:
return string_builder << "UploadByHash";
case FileManager::Query::Type::UploadWaitFileReference:
return string_builder << "UploadWaitFileReference";
case FileManager::Query::Type::Upload:
return string_builder << "Upload";
case FileManager::Query::Type::DownloadWaitFileReference:
case FileManager::DownloadQuery::Type::DownloadWaitFileReference:
return string_builder << "DownloadWaitFileReference";
case FileManager::Query::Type::DownloadReloadDialog:
case FileManager::DownloadQuery::Type::DownloadReloadDialog:
return string_builder << "DownloadReloadDialog";
case FileManager::Query::Type::Download:
case FileManager::DownloadQuery::Type::Download:
return string_builder << "Download";
case FileManager::Query::Type::SetContent:
case FileManager::DownloadQuery::Type::SetContent:
return string_builder << "SetContent";
case FileManager::Query::Type::Generate:
return string_builder << "Generate";
default:
UNREACHABLE();
return string_builder << "Unknown";
}
}
StringBuilder &operator<<(StringBuilder &string_builder, FileManager::UploadQuery::Type type) {
switch (type) {
case FileManager::UploadQuery::Type::UploadByHash:
return string_builder << "UploadByHash";
case FileManager::UploadQuery::Type::UploadWaitFileReference:
return string_builder << "UploadWaitFileReference";
case FileManager::UploadQuery::Type::Upload:
return string_builder << "Upload";
default:
UNREACHABLE();
return string_builder << "Unknown";
@ -2247,7 +2253,8 @@ bool FileManager::set_content(FileId file_id, BufferSlice bytes) {
node->set_download_priority(FROM_BYTES_PRIORITY);
FileDownloadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::SetContent});
FileDownloadManager::QueryId query_id =
download_queries_.create(DownloadQuery{file_id, DownloadQuery::Type::SetContent});
node->download_id_ = query_id;
node->is_download_started_ = true;
send_closure(file_download_manager_, &FileDownloadManager::from_bytes, query_id,
@ -2557,7 +2564,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
if (node->need_reload_photo_ && file_view.may_reload_photo()) {
LOG(INFO) << "Reload photo from file " << node->main_file_id_;
FileDownloadManager::QueryId query_id =
queries_container_.create(Query{file_id, Query::Type::DownloadReloadDialog});
download_queries_.create(DownloadQuery{file_id, DownloadQuery::Type::DownloadReloadDialog});
node->download_id_ = query_id;
context_->reload_photo(file_view.remote_location().get_source(),
PromiseCreator::lambda([actor_id = actor_id(this), query_id, file_id](Result<Unit> res) {
@ -2579,7 +2586,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
if (!file_view.has_active_download_remote_location()) {
VLOG(file_references) << "Do not have valid file_reference for file " << file_id;
FileDownloadManager::QueryId query_id =
queries_container_.create(Query{file_id, Query::Type::DownloadWaitFileReference});
download_queries_.create(DownloadQuery{file_id, DownloadQuery::Type::DownloadWaitFileReference});
node->download_id_ = query_id;
if (node->download_was_update_file_reference_) {
return on_download_error(query_id, Status::Error("Can't download file: have no valid file reference"));
@ -2600,7 +2607,8 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
return;
}
FileDownloadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Download});
FileDownloadManager::QueryId query_id =
download_queries_.create(DownloadQuery{file_id, DownloadQuery::Type::Download});
node->download_id_ = query_id;
node->is_download_started_ = false;
LOG(INFO) << "Run download of file " << file_id << " of size " << node->size_ << " from "
@ -2994,7 +3002,7 @@ void FileManager::run_generate(FileNodePtr node) {
return;
}
FileGenerateManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Generate});
FileGenerateManager::QueryId query_id = generate_queries_.create(GenerateQuery{file_id});
node->generate_id_ = query_id;
send_closure(file_generate_manager_, &FileGenerateManager::generate_file, query_id, *node->generate_, node->local_,
node->suggested_path(), [file_manager = this, query_id] {
@ -3099,7 +3107,7 @@ void FileManager::run_upload(FileNodePtr node, vector<int> bad_parts) {
if (file_view.has_alive_remote_location() && !file_view.has_active_upload_remote_location() &&
can_reuse_remote_file(file_view.get_type())) {
FileUploadManager::QueryId query_id =
queries_container_.create(Query{file_id, Query::Type::UploadWaitFileReference});
upload_queries_.create(UploadQuery{file_id, UploadQuery::Type::UploadWaitFileReference});
node->upload_id_ = query_id;
if (node->upload_was_update_file_reference_) {
return on_upload_error(query_id, Status::Error("Can't upload file: have no valid file reference"));
@ -3116,7 +3124,7 @@ void FileManager::run_upload(FileNodePtr node, vector<int> bad_parts) {
if (!node->remote_.partial && node->get_by_hash_) {
LOG(INFO) << "Get file " << node->main_file_id_ << " by hash";
FileUploadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::UploadByHash});
FileUploadManager::QueryId query_id = upload_queries_.create(UploadQuery{file_id, UploadQuery::Type::UploadByHash});
node->upload_id_ = query_id;
send_closure(file_upload_manager_, &FileUploadManager::upload_by_hash, query_id, node->local_.full(), node->size_,
@ -3132,7 +3140,7 @@ void FileManager::run_upload(FileNodePtr node, vector<int> bad_parts) {
expected_size = 10 << 20;
}
FileUploadManager::QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Upload});
FileUploadManager::QueryId query_id = upload_queries_.create(UploadQuery{file_id, UploadQuery::Type::Upload});
node->upload_id_ = query_id;
send_closure(file_upload_manager_, &FileUploadManager::upload, query_id, node->local_,
node->remote_.partial_or_empty(), expected_size, node->encryption_key_, new_priority,
@ -3752,7 +3760,7 @@ void FileManager::on_start_download(FileDownloadManager::QueryId query_id) {
return;
}
auto query = queries_container_.get(query_id);
auto query = download_queries_.get(query_id);
CHECK(query != nullptr);
auto file_id = query->file_id_;
@ -3775,7 +3783,7 @@ void FileManager::on_partial_download(FileDownloadManager::QueryId query_id, Par
return;
}
auto query = queries_container_.get(query_id);
auto query = download_queries_.get(query_id);
CHECK(query != nullptr);
auto file_id = query->file_id_;
@ -3804,7 +3812,7 @@ void FileManager::on_hash(FileUploadManager::QueryId query_id, string hash) {
return;
}
auto query = queries_container_.get(query_id);
auto query = upload_queries_.get(query_id);
CHECK(query != nullptr);
auto file_id = query->file_id_;
@ -3827,7 +3835,7 @@ void FileManager::on_partial_upload(FileUploadManager::QueryId query_id, Partial
return;
}
auto query = queries_container_.get(query_id);
auto query = upload_queries_.get(query_id);
CHECK(query != nullptr);
auto file_id = query->file_id_;
@ -3854,9 +3862,9 @@ void FileManager::on_download_ok(FileDownloadManager::QueryId query_id, FullLoca
return;
}
Query query;
DownloadQuery query;
bool was_active;
std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
std::tie(query, was_active) = finish_download_query(query_id);
auto file_id = query.file_id_;
LOG(INFO) << "ON DOWNLOAD OK of " << (is_new ? "new" : "checked") << " file " << file_id << " of size " << size;
auto r_new_file_id = register_local(std::move(local), DialogId(), size, false, false, true, file_id);
@ -3881,7 +3889,7 @@ void FileManager::on_upload_ok(FileUploadManager::QueryId query_id, FileType fil
}
CHECK(partial_remote.ready_part_count_ == partial_remote.part_count_);
auto some_file_id = finish_query(static_cast<Container<Query>::Id>(query_id)).first.file_id_;
auto some_file_id = finish_upload_query(query_id).first.file_id_;
LOG(INFO) << "ON UPLOAD OK file " << some_file_id << " of size " << size;
auto file_node = get_file_node(some_file_id);
@ -3958,7 +3966,7 @@ void FileManager::on_upload_full_ok(FileUploadManager::QueryId query_id, FullRem
return;
}
auto file_id = finish_query(static_cast<Container<Query>::Id>(query_id)).first.file_id_;
auto file_id = finish_upload_query(query_id).first.file_id_;
LOG(INFO) << "ON UPLOAD FULL OK for file " << file_id;
auto new_file_id = register_remote(std::move(remote), FileLocationSource::FromServer, DialogId(), 0, 0, "");
LOG_STATUS(merge(new_file_id, file_id));
@ -3970,7 +3978,7 @@ void FileManager::on_partial_generate(FileGenerateManager::QueryId query_id, Par
return;
}
auto query = queries_container_.get(query_id);
auto query = generate_queries_.get(query_id);
CHECK(query != nullptr);
auto file_id = query->file_id_;
@ -4006,9 +4014,9 @@ void FileManager::on_generate_ok(FileGenerateManager::QueryId query_id, FullLoca
return;
}
Query query;
GenerateQuery query;
bool was_active;
std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
std::tie(query, was_active) = finish_generate_query(query_id);
auto generate_file_id = query.file_id_;
LOG(INFO) << "Receive on_generate_ok for file " << generate_file_id << ": " << local;
@ -4050,9 +4058,9 @@ void FileManager::on_download_error(FileDownloadManager::QueryId query_id, Statu
return;
}
Query query;
DownloadQuery query;
bool was_active;
std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
std::tie(query, was_active) = finish_download_query(query_id);
auto node = get_file_node(query.file_id_);
if (!node) {
LOG(ERROR) << "Can't find file node for " << query.file_id_ << " " << status;
@ -4066,9 +4074,9 @@ void FileManager::on_generate_error(FileGenerateManager::QueryId query_id, Statu
return;
}
Query query;
GenerateQuery query;
bool was_active;
std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
std::tie(query, was_active) = finish_generate_query(query_id);
auto node = get_file_node(query.file_id_);
if (!node) {
LOG(ERROR) << "Can't find file node for " << query.file_id_ << " " << status;
@ -4082,16 +4090,16 @@ void FileManager::on_upload_error(FileUploadManager::QueryId query_id, Status st
return;
}
Query query;
UploadQuery query;
bool was_active;
std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
std::tie(query, was_active) = finish_upload_query(query_id);
auto node = get_file_node(query.file_id_);
if (!node) {
LOG(ERROR) << "Can't find file node for " << query.file_id_ << " " << status;
return;
}
if (query.type_ == Query::Type::UploadByHash && !G()->close_flag()) {
if (query.type_ == UploadQuery::Type::UploadByHash && !G()->close_flag()) {
LOG(INFO) << "Upload By Hash failed: " << status << ", restart upload";
node->get_by_hash_ = false;
return run_upload(node, {});
@ -4099,7 +4107,7 @@ void FileManager::on_upload_error(FileUploadManager::QueryId query_id, Status st
on_upload_error_impl(node, query.type_, was_active, std::move(status));
}
void FileManager::on_download_error_impl(FileNodePtr node, Query::Type type, bool was_active, Status status) {
void FileManager::on_download_error_impl(FileNodePtr node, DownloadQuery::Type type, bool was_active, Status status) {
SCOPE_EXIT {
try_flush_node(node, "on_error_impl");
};
@ -4216,7 +4224,7 @@ void FileManager::on_generate_error_impl(FileNodePtr node, bool was_active, Stat
on_file_load_error(node, std::move(status));
}
void FileManager::on_upload_error_impl(FileNodePtr node, Query::Type type, bool was_active, Status status) {
void FileManager::on_upload_error_impl(FileNodePtr node, UploadQuery::Type type, bool was_active, Status status) {
SCOPE_EXIT {
try_flush_node(node, "on_upload_error_impl");
};
@ -4290,39 +4298,53 @@ void FileManager::on_file_load_error(FileNodePtr node, Status status) {
}
}
std::pair<FileManager::Query, bool> FileManager::finish_query(Container<Query>::Id query_id) {
SCOPE_EXIT {
queries_container_.erase(query_id);
};
auto query = queries_container_.get(query_id);
std::pair<FileManager::DownloadQuery, bool> FileManager::finish_download_query(FileDownloadManager::QueryId query_id) {
auto query = download_queries_.get(query_id);
CHECK(query != nullptr);
auto res = *query;
download_queries_.erase(query_id);
auto node = get_file_node(res.file_id_);
if (!node) {
return std::make_pair(res, false);
}
bool was_active = false;
if (node->generate_id_ == query_id) {
node->generate_id_ = 0;
node->generate_was_update_ = false;
node->set_generate_priority(0, 0);
was_active = true;
}
if (node->download_id_ == query_id) {
if (node && node->download_id_ == query_id) {
node->download_id_ = 0;
node->download_was_update_file_reference_ = false;
node->is_download_started_ = false;
node->set_download_priority(0);
was_active = true;
return std::make_pair(res, true);
}
if (node->upload_id_ == query_id) {
return std::make_pair(res, false);
}
std::pair<FileManager::GenerateQuery, bool> FileManager::finish_generate_query(FileGenerateManager::QueryId query_id) {
auto query = generate_queries_.get(query_id);
CHECK(query != nullptr);
auto res = *query;
generate_queries_.erase(query_id);
auto node = get_file_node(res.file_id_);
if (node && node->generate_id_ == query_id) {
node->generate_id_ = 0;
node->generate_was_update_ = false;
node->set_generate_priority(0, 0);
return std::make_pair(res, true);
}
return std::make_pair(res, false);
}
std::pair<FileManager::UploadQuery, bool> FileManager::finish_upload_query(FileUploadManager::QueryId query_id) {
auto query = upload_queries_.get(query_id);
CHECK(query != nullptr);
auto res = *query;
upload_queries_.erase(query_id);
auto node = get_file_node(res.file_id_);
if (node && node->upload_id_ == query_id) {
node->upload_id_ = 0;
node->upload_was_update_file_reference_ = false;
node->set_upload_priority(0);
was_active = true;
return std::make_pair(res, true);
}
return std::make_pair(res, was_active);
return std::make_pair(res, false);
}
FullRemoteFileLocation *FileManager::get_remote(int32 key) {
@ -4349,32 +4371,39 @@ void FileManager::hangup() {
file_generate_manager_.reset();
file_download_manager_.reset();
file_upload_manager_.reset();
while (!queries_container_.empty()) {
auto query_ids = queries_container_.ids();
while (!download_queries_.empty()) {
auto query_ids = download_queries_.ids();
for (auto query_id : query_ids) {
Query query;
DownloadQuery query;
bool was_active;
std::tie(query, was_active) = finish_query(static_cast<Container<Query>::Id>(query_id));
std::tie(query, was_active) = finish_download_query(static_cast<FileDownloadManager::QueryId>(query_id));
auto node = get_file_node(query.file_id_);
if (node) {
switch (query.type_) {
case Query::Type::UploadByHash:
case Query::Type::UploadWaitFileReference:
case Query::Type::Upload:
on_upload_error_impl(node, query.type_, was_active, Global::request_aborted_error());
break;
case Query::Type::DownloadWaitFileReference:
case Query::Type::DownloadReloadDialog:
case Query::Type::Download:
case Query::Type::SetContent:
on_download_error_impl(node, query.type_, was_active, Global::request_aborted_error());
break;
case Query::Type::Generate:
on_generate_error_impl(node, was_active, Global::request_aborted_error());
break;
default:
UNREACHABLE();
}
on_download_error_impl(node, query.type_, was_active, Global::request_aborted_error());
}
}
}
while (!generate_queries_.empty()) {
auto query_ids = generate_queries_.ids();
for (auto query_id : query_ids) {
GenerateQuery query;
bool was_active;
std::tie(query, was_active) = finish_generate_query(static_cast<FileGenerateManager::QueryId>(query_id));
auto node = get_file_node(query.file_id_);
if (node) {
on_generate_error_impl(node, was_active, Global::request_aborted_error());
}
}
}
while (!upload_queries_.empty()) {
auto query_ids = upload_queries_.ids();
for (auto query_id : query_ids) {
UploadQuery query;
bool was_active;
std::tie(query, was_active) = finish_upload_query(static_cast<FileUploadManager::QueryId>(query_id));
auto node = get_file_node(query.file_id_);
if (node) {
on_upload_error_impl(node, query.type_, was_active, Global::request_aborted_error());
}
}
}

View File

@ -630,22 +630,30 @@ class FileManager final : public Actor {
using FileNodeId = int32;
class Query {
public:
struct DownloadQuery {
FileId file_id_;
enum class Type : int32 {
DownloadWaitFileReference,
DownloadReloadDialog,
Download,
SetContent,
} type_;
};
friend StringBuilder &operator<<(StringBuilder &string_builder, DownloadQuery::Type type);
struct GenerateQuery {
FileId file_id_;
};
struct UploadQuery {
FileId file_id_;
enum class Type : int32 {
UploadByHash,
UploadWaitFileReference,
Upload,
DownloadWaitFileReference,
DownloadReloadDialog,
Download,
SetContent,
Generate
} type_;
};
friend StringBuilder &operator<<(StringBuilder &string_builder, Query::Type type);
friend StringBuilder &operator<<(StringBuilder &string_builder, UploadQuery::Type type);
struct FileIdInfo {
FileNodeId node_id_{0};
@ -699,7 +707,9 @@ class FileManager final : public Actor {
ActorOwn<FileUploadManager> file_upload_manager_;
ActorOwn<FileGenerateManager> file_generate_manager_;
Container<Query> queries_container_;
Container<DownloadQuery> download_queries_;
Container<GenerateQuery> generate_queries_;
Container<UploadQuery> upload_queries_;
bool is_closed_ = false;
@ -775,7 +785,7 @@ class FileManager final : public Actor {
int64 ready_size, int64 size);
void on_download_ok(FileDownloadManager::QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new);
void on_download_error(FileDownloadManager::QueryId query_id, Status status);
void on_download_error_impl(FileNodePtr node, Query::Type type, bool was_active, Status status);
void on_download_error_impl(FileNodePtr node, DownloadQuery::Type type, bool was_active, Status status);
void on_hash(FileUploadManager::QueryId query_id, string hash);
void on_partial_upload(FileUploadManager::QueryId query_id, PartialRemoteFileLocation partial_remote,
@ -784,7 +794,7 @@ class FileManager final : public Actor {
int64 size);
void on_upload_full_ok(FileUploadManager::QueryId query_id, FullRemoteFileLocation remote);
void on_upload_error(FileUploadManager::QueryId query_id, Status status);
void on_upload_error_impl(FileNodePtr node, Query::Type type, bool was_active, Status status);
void on_upload_error_impl(FileNodePtr node, UploadQuery::Type type, bool was_active, Status status);
void on_partial_generate(FileGenerateManager::QueryId, PartialLocalFileLocation partial_local, int64 expected_size);
void on_generate_ok(FileGenerateManager::QueryId, FullLocalFileLocation local);
@ -793,7 +803,11 @@ class FileManager final : public Actor {
void on_file_load_error(FileNodePtr node, Status status);
std::pair<Query, bool> finish_query(Container<Query>::Id query_id);
std::pair<DownloadQuery, bool> finish_download_query(FileDownloadManager::QueryId query_id);
std::pair<GenerateQuery, bool> finish_generate_query(FileGenerateManager::QueryId query_id);
std::pair<UploadQuery, bool> finish_upload_query(FileUploadManager::QueryId query_id);
FullRemoteFileLocation *get_remote(int32 key);