FileReferenceManager: new queries logic
GitOrigin-RevId: 82592baaed566ef28e044cd0dc9fe67b625ae2ab
This commit is contained in:
parent
8851b9d066
commit
a6e47943c3
@ -46,62 +46,185 @@ FileSourceId FileReferenceManager::create_message_file_source(FullMessageId full
|
|||||||
return source_id;
|
return source_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
void FileReferenceManager::update_file_reference(FileId file_id, vector<FileSourceId> file_source_ids,
|
void FileReferenceManager::add_file_source(NodeId node_id, FileSourceId file_source_id) {
|
||||||
Promise<> promise) {
|
VLOG(file_references) << "add_file_source: " << node_id << " " << file_source_id;
|
||||||
VLOG(file_references) << "Trying to load valid file_reference from server: " << file_id << " " << file_source_ids;
|
nodes_[node_id].file_source_ids.add(file_source_id);
|
||||||
MultiPromiseActorSafe mpas{"UpdateFileReferenceMultiPromiseActor"};
|
|
||||||
mpas.set_ignore_errors(true);
|
|
||||||
mpas.add_promise(std::move(promise));
|
|
||||||
auto lock = mpas.get_promise();
|
|
||||||
for (auto source_id : file_source_ids) {
|
|
||||||
auto index = static_cast<size_t>(source_id.get()) - 1;
|
|
||||||
CHECK(index < file_sources_.size());
|
|
||||||
|
|
||||||
auto new_promise = PromiseCreator::lambda([promise = mpas.get_promise(), file_id, source_id,
|
|
||||||
file_manager = G()->file_manager()](Result<Unit> result) mutable {
|
|
||||||
if (result.is_error() && result.error().code() != 429 && result.error().code() < 500 && !G()->close_flag()) {
|
|
||||||
VLOG(file_references) << "Invalid source id " << source_id << " " << result.error();
|
|
||||||
send_closure(file_manager, &FileManager::remove_file_source, file_id, source_id);
|
|
||||||
}
|
}
|
||||||
// NB: main promise must send closure to FileManager
|
void FileReferenceManager::remove_file_source(NodeId node_id, FileSourceId file_source_id) {
|
||||||
// So the closure will be executed only after the bad source id is removed
|
VLOG(file_references) << "remove_file_source: " << node_id << " " << file_source_id;
|
||||||
promise.set_value(Unit());
|
nodes_[node_id].file_source_ids.remove(file_source_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
void merge(std::vector<Promise<>> &a, std::vector<Promise<>> &b) {
|
||||||
|
if (a.size() < b.size()) {
|
||||||
|
std::swap(a, b);
|
||||||
|
}
|
||||||
|
for (auto &x : b) {
|
||||||
|
a.push_back(std::move(x));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FileReferenceManager::merge(NodeId to_node_id, NodeId from_node_id) {
|
||||||
|
VLOG(file_references) << "merge: " << to_node_id << " " << from_node_id;
|
||||||
|
auto &to = nodes_[to_node_id];
|
||||||
|
auto &from = nodes_[from_node_id];
|
||||||
|
CHECK(!to.query || to.query->proxy.empty());
|
||||||
|
CHECK(!from.query || from.query->proxy.empty());
|
||||||
|
if (to.query || from.query) {
|
||||||
|
if (!to.query) {
|
||||||
|
to.query = make_unique<Query>();
|
||||||
|
to.query->generation = ++query_generation;
|
||||||
|
}
|
||||||
|
if (from.query) {
|
||||||
|
::td::merge(to.query->promises, from.query->promises);
|
||||||
|
to.query->active_queries += from.query->active_queries;
|
||||||
|
from.query->proxy = {to_node_id, to.query->generation};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
to.file_source_ids.merge(std::move(from.file_source_ids));
|
||||||
|
run_node(to_node_id);
|
||||||
|
run_node(from_node_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
void FileReferenceManager::run_node(NodeId node_id) {
|
||||||
|
VLOG(file_references) << "run_node: " << node_id;
|
||||||
|
Node &node = nodes_[node_id];
|
||||||
|
if (!node.query) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (node.query->active_queries != 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (node.query->promises.empty()) {
|
||||||
|
node.query = {};
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!node.file_source_ids.has_next()) {
|
||||||
|
for (auto &p : node.query->promises) {
|
||||||
|
p.set_value(Unit());
|
||||||
|
}
|
||||||
|
node.query = {};
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto file_source_id = node.file_source_ids.next();
|
||||||
|
send_query({node_id, node.query->generation}, file_source_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
void FileReferenceManager::send_query(Destination dest, FileSourceId file_source_id) {
|
||||||
|
VLOG(file_references) << "send_query " << dest.node_id << " " << dest.generation << " " << file_source_id;
|
||||||
|
auto &node = nodes_[dest.node_id];
|
||||||
|
node.query->active_queries++;
|
||||||
|
|
||||||
|
auto promise = PromiseCreator::lambda([dest, file_source_id, file_reference_manager = G()->file_reference_manager(),
|
||||||
|
file_manager = G()->file_manager()](Result<Unit> result) mutable {
|
||||||
|
auto new_promise =
|
||||||
|
PromiseCreator::lambda([dest, file_source_id, file_reference_manager](Result<Unit> result) mutable {
|
||||||
|
Status status;
|
||||||
|
if (result.is_error()) {
|
||||||
|
status = result.move_as_error();
|
||||||
|
}
|
||||||
|
send_closure(file_reference_manager, &FileReferenceManager::on_query_result, dest, file_source_id,
|
||||||
|
std::move(status), 0);
|
||||||
});
|
});
|
||||||
|
if (result.is_error()) {
|
||||||
|
new_promise.set_result(std::move(result));
|
||||||
|
}
|
||||||
|
send_lambda(file_manager, [file_manager, dest, new_promise = std::move(new_promise)]() mutable {
|
||||||
|
auto view = file_manager.get_actor_unsafe()->get_file_view(dest.node_id);
|
||||||
|
if (view.has_active_remote_location()) {
|
||||||
|
new_promise.set_value({});
|
||||||
|
} else {
|
||||||
|
new_promise.set_error(Status::Error("No active remote location"));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
auto index = static_cast<size_t>(file_source_id.get()) - 1;
|
||||||
|
CHECK(index < file_sources_.size());
|
||||||
file_sources_[index].visit(overloaded(
|
file_sources_[index].visit(overloaded(
|
||||||
[&](const FileSourceMessage &source) {
|
[&](const FileSourceMessage &source) {
|
||||||
send_closure_later(G()->messages_manager(), &MessagesManager::get_messages_from_server,
|
send_closure_later(G()->messages_manager(), &MessagesManager::get_messages_from_server,
|
||||||
vector<FullMessageId>{source.full_message_id}, std::move(new_promise), nullptr);
|
vector<FullMessageId>{source.full_message_id}, std::move(promise), nullptr);
|
||||||
},
|
},
|
||||||
[&](const FileSourceUserPhoto &source) {
|
[&](const FileSourceUserPhoto &source) {
|
||||||
// send_closure_later(G()->contacts_manager(), &ContactsManager::get_user_photo_from_server, source.user_id,
|
// send_closure_later(G()->contacts_manager(), &ContactsManager::get_user_photo_from_server, source.user_id,
|
||||||
// source.photo_id, std::move(new_promise));
|
// source.photo_id, std::move(promise));
|
||||||
},
|
},
|
||||||
[&](const FileSourceChatPhoto &source) {
|
[&](const FileSourceChatPhoto &source) {
|
||||||
// send_closure_later(G()->contacts_manager(), &ContactsManager::get_chat_photo_from_server, source.chat_id,
|
// send_closure_later(G()->contacts_manager(), &ContactsManager::get_chat_photo_from_server, source.chat_id,
|
||||||
// std::move(new_promise));
|
// std::move(promise));
|
||||||
},
|
},
|
||||||
[&](const FileSourceChannelPhoto &source) {
|
[&](const FileSourceChannelPhoto &source) {
|
||||||
// send_closure_later(G()->contacts_manager(), &ContactsManager::get_channel_photo_from_server,
|
// send_closure_later(G()->contacts_manager(), &ContactsManager::get_channel_photo_from_server,
|
||||||
// source.channel_id, std::move(new_promise));
|
// source.channel_id, std::move(promise));
|
||||||
},
|
},
|
||||||
[&](const FileSourceWallpapers &source) {
|
[&](const FileSourceWallpapers &source) {
|
||||||
// send_closure_later(G()->wallpaper_manager(), &WallpaperManager::get_wallpapers_from_server,
|
// send_closure_later(G()->wallpaper_manager(), &WallpaperManager::get_wallpapers_from_server,
|
||||||
// std::move(new_promise));
|
// std::move(promise));
|
||||||
},
|
},
|
||||||
[&](const FileSourceWebPage &source) {
|
[&](const FileSourceWebPage &source) {
|
||||||
send_closure_later(G()->web_pages_manager(), &WebPagesManager::reload_web_page_by_url, source.url,
|
send_closure_later(G()->web_pages_manager(), &WebPagesManager::reload_web_page_by_url, source.url,
|
||||||
std::move(new_promise));
|
std::move(promise));
|
||||||
},
|
},
|
||||||
[&](const FileSourceSavedAnimations &source) {
|
[&](const FileSourceSavedAnimations &source) {
|
||||||
/*
|
/*
|
||||||
// TODO this is wrong, because we shouldn't pass animations hash to the call
|
// TODO this is wrong, because we shouldn't pass animations hash to the call
|
||||||
// we also sometimes need to do two simultaneous calls one with and one without hash
|
// we also sometimes need to do two simultaneous calls one with and one without hash
|
||||||
send_closure_later(G()->animations_manager(), &AnimationsManager::reload_saved_animations,
|
send_closure_later(G()->animations_manager(), &AnimationsManager::reload_saved_animations,
|
||||||
true, std::move(new_promise));
|
true, std::move(promise));
|
||||||
*/
|
*/
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
lock.set_value(Unit());
|
|
||||||
|
FileReferenceManager::Destination FileReferenceManager::on_query_result(Destination dest, FileSourceId file_source_id,
|
||||||
|
Status status, int32 sub) {
|
||||||
|
VLOG(file_references) << "on_query_result " << dest.node_id << " " << dest.generation << " " << file_source_id << " "
|
||||||
|
<< status << " " << sub;
|
||||||
|
auto &node = nodes_[dest.node_id];
|
||||||
|
|
||||||
|
auto query = node.query.get();
|
||||||
|
if (!query) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
if (query->generation != dest.generation) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
query->active_queries--;
|
||||||
|
|
||||||
|
if (!query->proxy.empty()) {
|
||||||
|
query->active_queries -= sub;
|
||||||
|
auto new_proxy = on_query_result(query->proxy, file_source_id, std::move(status), query->active_queries);
|
||||||
|
if (!new_proxy.empty()) {
|
||||||
|
query->proxy = new_proxy;
|
||||||
|
}
|
||||||
|
run_node(dest.node_id);
|
||||||
|
return new_proxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (status.is_ok()) {
|
||||||
|
for (auto &p : query->promises) {
|
||||||
|
p.set_value(Unit());
|
||||||
|
}
|
||||||
|
node.query = {};
|
||||||
|
}
|
||||||
|
if (status.is_error() && status.error().code() != 429 && status.error().code() < 500 && !G()->close_flag()) {
|
||||||
|
VLOG(file_references) << "Invalid source id " << file_source_id << " " << status;
|
||||||
|
remove_file_source(dest.node_id, file_source_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
run_node(dest.node_id);
|
||||||
|
return dest;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FileReferenceManager::update_file_reference(NodeId node_id, Promise<> promise) {
|
||||||
|
VLOG(file_references) << "update_file_reference " << node_id;
|
||||||
|
auto &node = nodes_[node_id];
|
||||||
|
if (!node.query) {
|
||||||
|
node.query = make_unique<Query>();
|
||||||
|
node.query->promises.push_back(std::move(promise));
|
||||||
|
node.query->generation = ++query_generation;
|
||||||
|
node.file_source_ids.reset_position();
|
||||||
|
VLOG(file_references) << "new query " << query_generation;
|
||||||
|
}
|
||||||
|
run_node(node_id);
|
||||||
|
}
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -22,14 +22,90 @@
|
|||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
extern int VERBOSITY_NAME(file_references);
|
extern int VERBOSITY_NAME(file_references);
|
||||||
|
template <class T>
|
||||||
|
class SetWithPosition {
|
||||||
|
public:
|
||||||
|
void add(T value) {
|
||||||
|
auto it = std::find(values_.begin(), values_.end(), value);
|
||||||
|
if (it != end(values_)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
values_.push_back(value);
|
||||||
|
}
|
||||||
|
void remove(T value) {
|
||||||
|
auto it = std::find(values_.begin(), values_.end(), value);
|
||||||
|
if (it == end(values_)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
size_t i = it - values_.begin();
|
||||||
|
values_.erase(it);
|
||||||
|
if (pos_ > i) {
|
||||||
|
pos_--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void reset_position() {
|
||||||
|
pos_ = 0;
|
||||||
|
}
|
||||||
|
T next() {
|
||||||
|
return values_[pos_++];
|
||||||
|
}
|
||||||
|
bool has_next() {
|
||||||
|
return pos_ < values_.size();
|
||||||
|
}
|
||||||
|
void merge(SetWithPosition &&other) {
|
||||||
|
std::vector<T> new_values_;
|
||||||
|
for (size_t i = 0; i < pos_; i++) {
|
||||||
|
new_values_.push_back(values_[i]);
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < other.pos_; i++) {
|
||||||
|
new_values_.push_back(other.values_[i]);
|
||||||
|
}
|
||||||
|
for (size_t i = pos_; i < values_.size(); i++) {
|
||||||
|
new_values_.push_back(values_[i]);
|
||||||
|
}
|
||||||
|
for (size_t i = other.pos_; i < other.values_.size(); i++) {
|
||||||
|
new_values_.push_back(other.values_[i]);
|
||||||
|
}
|
||||||
|
pos_ += other.values_.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::vector<T> values_;
|
||||||
|
size_t pos_{0};
|
||||||
|
};
|
||||||
|
|
||||||
class FileReferenceManager : public Actor {
|
class FileReferenceManager : public Actor {
|
||||||
|
struct Node;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
FileSourceId create_message_file_source(FullMessageId full_message_id);
|
FileSourceId create_message_file_source(FullMessageId full_message_id);
|
||||||
|
|
||||||
void update_file_reference(FileId file_id, vector<FileSourceId> file_source_ids, Promise<> promise);
|
using NodeId = FileId;
|
||||||
|
void update_file_reference(NodeId node_id, Promise<> promise);
|
||||||
|
void add_file_source(NodeId node_id, FileSourceId file_source_id);
|
||||||
|
void remove_file_source(NodeId file_id, FileSourceId file_source_id);
|
||||||
|
void merge(NodeId to_node_id, NodeId from_node_id);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
struct Destination {
|
||||||
|
bool empty() const {
|
||||||
|
return node_id.empty();
|
||||||
|
}
|
||||||
|
NodeId node_id;
|
||||||
|
int64 generation;
|
||||||
|
};
|
||||||
|
struct Query {
|
||||||
|
std::vector<Promise<>> promises;
|
||||||
|
int32 active_queries{0};
|
||||||
|
Destination proxy;
|
||||||
|
int64 generation;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Node {
|
||||||
|
SetWithPosition<FileSourceId> file_source_ids;
|
||||||
|
unique_ptr<Query> query;
|
||||||
|
};
|
||||||
|
|
||||||
struct FileSourceMessage {
|
struct FileSourceMessage {
|
||||||
FullMessageId full_message_id;
|
FullMessageId full_message_id;
|
||||||
};
|
};
|
||||||
@ -60,6 +136,13 @@ class FileReferenceManager : public Actor {
|
|||||||
std::unordered_map<FullMessageId, FileSourceId, FullMessageIdHash> full_message_id_to_file_source_id_;
|
std::unordered_map<FullMessageId, FileSourceId, FullMessageIdHash> full_message_id_to_file_source_id_;
|
||||||
|
|
||||||
int32 last_file_source_id_{0};
|
int32 last_file_source_id_{0};
|
||||||
|
int64 query_generation{0};
|
||||||
|
|
||||||
|
std::unordered_map<NodeId, Node, FileIdHash> nodes_;
|
||||||
|
|
||||||
|
void run_node(NodeId node);
|
||||||
|
void send_query(Destination dest, FileSourceId file_source_id);
|
||||||
|
Destination on_query_result(Destination dest, FileSourceId file_source_id, Status status, int32 sub = 0);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -162,8 +162,8 @@ void FileNode::set_remote_location(const RemoteFileLocation &remote, FileLocatio
|
|||||||
void FileNode::delete_file_reference(Slice file_reference) {
|
void FileNode::delete_file_reference(Slice file_reference) {
|
||||||
if (remote_.type() == RemoteFileLocation::Type::Full && remote_.full().delete_file_reference(file_reference)) {
|
if (remote_.type() == RemoteFileLocation::Type::Full && remote_.full().delete_file_reference(file_reference)) {
|
||||||
VLOG(file_references) << "Delete file reference of file " << main_file_id_;
|
VLOG(file_references) << "Delete file reference of file " << main_file_id_;
|
||||||
upload_may_update_file_reference_ = true;
|
upload_was_update_file_reference_ = false;
|
||||||
download_may_update_file_reference_ = true;
|
download_was_update_file_reference_ = false;
|
||||||
on_pmc_changed();
|
on_pmc_changed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -256,27 +256,6 @@ void FileNode::set_generate_priority(int8 download_priority, int8 upload_priorit
|
|||||||
generate_upload_priority_ = upload_priority;
|
generate_upload_priority_ = upload_priority;
|
||||||
}
|
}
|
||||||
|
|
||||||
void FileNode::add_file_source(FileSourceId file_source_id) {
|
|
||||||
if (std::find(file_source_ids_.begin(), file_source_ids_.end(), file_source_id) != file_source_ids_.end()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
VLOG(file_references) << "Add " << file_source_id << " to file " << main_file_id_;
|
|
||||||
upload_may_update_file_reference_ = true;
|
|
||||||
download_may_update_file_reference_ = true;
|
|
||||||
file_source_ids_.push_back(file_source_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
void FileNode::remove_file_source(FileSourceId file_source_id) {
|
|
||||||
auto it = std::find(file_source_ids_.begin(), file_source_ids_.end(), file_source_id);
|
|
||||||
if (it == file_source_ids_.end()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
VLOG(file_references) << "Remove " << file_source_id << " from file " << main_file_id_;
|
|
||||||
file_source_ids_.erase(it);
|
|
||||||
}
|
|
||||||
|
|
||||||
void FileNode::on_changed() {
|
void FileNode::on_changed() {
|
||||||
on_pmc_changed();
|
on_pmc_changed();
|
||||||
on_info_changed();
|
on_info_changed();
|
||||||
@ -1098,7 +1077,7 @@ void FileManager::cancel_download(FileNodePtr node) {
|
|||||||
send_closure(file_load_manager_, &FileLoadManager::cancel, node->download_id_);
|
send_closure(file_load_manager_, &FileLoadManager::cancel, node->download_id_);
|
||||||
node->download_id_ = 0;
|
node->download_id_ = 0;
|
||||||
node->is_download_started_ = false;
|
node->is_download_started_ = false;
|
||||||
node->download_may_update_file_reference_ = !node->file_source_ids_.empty();
|
node->download_was_update_file_reference_ = false;
|
||||||
node->set_download_priority(0);
|
node->set_download_priority(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1108,7 +1087,7 @@ void FileManager::cancel_upload(FileNodePtr node) {
|
|||||||
}
|
}
|
||||||
send_closure(file_load_manager_, &FileLoadManager::cancel, node->upload_id_);
|
send_closure(file_load_manager_, &FileLoadManager::cancel, node->upload_id_);
|
||||||
node->upload_id_ = 0;
|
node->upload_id_ = 0;
|
||||||
node->upload_may_update_file_reference_ = !node->file_source_ids_.empty();
|
node->upload_was_update_file_reference_ = false;
|
||||||
node->set_upload_priority(0);
|
node->set_upload_priority(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1223,9 +1202,11 @@ Result<FileId> FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy
|
|||||||
node->set_local_location(other_node->local_, other_node->local_ready_size_, other_node->download_offset_,
|
node->set_local_location(other_node->local_, other_node->local_ready_size_, other_node->download_offset_,
|
||||||
other_node->local_ready_prefix_size_);
|
other_node->local_ready_prefix_size_);
|
||||||
node->download_id_ = other_node->download_id_;
|
node->download_id_ = other_node->download_id_;
|
||||||
|
node->download_was_update_file_reference_ = other_node->download_was_update_file_reference_;
|
||||||
node->is_download_started_ |= other_node->is_download_started_;
|
node->is_download_started_ |= other_node->is_download_started_;
|
||||||
node->set_download_priority(other_node->download_priority_);
|
node->set_download_priority(other_node->download_priority_);
|
||||||
other_node->download_id_ = 0;
|
other_node->download_id_ = 0;
|
||||||
|
other_node->download_was_update_file_reference_ = false;
|
||||||
other_node->is_download_started_ = false;
|
other_node->is_download_started_ = false;
|
||||||
other_node->download_priority_ = 0;
|
other_node->download_priority_ = 0;
|
||||||
other_node->download_offset_ = 0;
|
other_node->download_offset_ = 0;
|
||||||
@ -1249,9 +1230,11 @@ Result<FileId> FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy
|
|||||||
cancel_upload(node);
|
cancel_upload(node);
|
||||||
node->set_remote_location(other_node->remote_, other_node->remote_source_, other_node->remote_ready_size_);
|
node->set_remote_location(other_node->remote_, other_node->remote_source_, other_node->remote_ready_size_);
|
||||||
node->upload_id_ = other_node->upload_id_;
|
node->upload_id_ = other_node->upload_id_;
|
||||||
|
node->upload_was_update_file_reference_ = other_node->upload_was_update_file_reference_;
|
||||||
node->set_upload_priority(other_node->upload_priority_);
|
node->set_upload_priority(other_node->upload_priority_);
|
||||||
node->set_upload_pause(other_node->upload_pause_);
|
node->set_upload_pause(other_node->upload_pause_);
|
||||||
other_node->upload_id_ = 0;
|
other_node->upload_id_ = 0;
|
||||||
|
other_node->upload_was_update_file_reference_ = false;
|
||||||
other_node->upload_priority_ = 0;
|
other_node->upload_priority_ = 0;
|
||||||
other_node->set_upload_pause(FileId());
|
other_node->set_upload_pause(FileId());
|
||||||
} else {
|
} else {
|
||||||
@ -1297,13 +1280,15 @@ Result<FileId> FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy
|
|||||||
}
|
}
|
||||||
node->need_load_from_pmc_ |= other_node->need_load_from_pmc_;
|
node->need_load_from_pmc_ |= other_node->need_load_from_pmc_;
|
||||||
node->can_search_locally_ &= other_node->can_search_locally_;
|
node->can_search_locally_ &= other_node->can_search_locally_;
|
||||||
for (auto source_id : other_node->file_source_ids_) {
|
|
||||||
node->add_file_source(source_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (main_file_id_i == other_node_i) {
|
if (main_file_id_i == other_node_i) {
|
||||||
|
send_closure(G()->file_reference_manager(), &FileReferenceManager::merge, other_node->main_file_id_,
|
||||||
|
node->main_file_id_);
|
||||||
node->main_file_id_ = other_node->main_file_id_;
|
node->main_file_id_ = other_node->main_file_id_;
|
||||||
node->main_file_id_priority_ = other_node->main_file_id_priority_;
|
node->main_file_id_priority_ = other_node->main_file_id_priority_;
|
||||||
|
} else {
|
||||||
|
send_closure(G()->file_reference_manager(), &FileReferenceManager::merge, node->main_file_id_,
|
||||||
|
other_node->main_file_id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool send_updates_flag = false;
|
bool send_updates_flag = false;
|
||||||
@ -1365,7 +1350,9 @@ void FileManager::add_file_source(FileId file_id, FileSourceId file_source_id) {
|
|||||||
if (!node) {
|
if (!node) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
node->add_file_source(file_source_id);
|
|
||||||
|
send_closure(G()->file_reference_manager(), &FileReferenceManager::add_file_source, node->main_file_id_,
|
||||||
|
file_source_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
void FileManager::remove_file_source(FileId file_id, FileSourceId file_source_id) {
|
void FileManager::remove_file_source(FileId file_id, FileSourceId file_source_id) {
|
||||||
@ -1374,7 +1361,8 @@ void FileManager::remove_file_source(FileId file_id, FileSourceId file_source_id
|
|||||||
if (!node) {
|
if (!node) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
node->remove_file_source(file_source_id);
|
send_closure(G()->file_reference_manager(), &FileReferenceManager::remove_file_source, node->main_file_id_,
|
||||||
|
file_source_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
void FileManager::try_flush_node_full(FileNodePtr node, bool new_remote, bool new_local, bool new_generate,
|
void FileManager::try_flush_node_full(FileNodePtr node, bool new_remote, bool new_local, bool new_generate,
|
||||||
@ -1761,25 +1749,20 @@ void FileManager::run_download(FileNodePtr node) {
|
|||||||
|
|
||||||
CHECK(node->download_id_ == 0);
|
CHECK(node->download_id_ == 0);
|
||||||
CHECK(!node->file_ids_.empty());
|
CHECK(!node->file_ids_.empty());
|
||||||
auto file_id = node->file_ids_.back();
|
auto file_id = node->main_file_id_;
|
||||||
|
|
||||||
// If file reference is needed
|
// If file reference is needed
|
||||||
if (!file_view.has_active_remote_location()) {
|
if (!file_view.has_active_remote_location()) {
|
||||||
VLOG(file_references) << "run_download: Do not have valid file_reference for file " << file_id;
|
VLOG(file_references) << "run_download: Do not have valid file_reference for file " << file_id;
|
||||||
QueryId id = queries_container_.create(Query{file_id, Query::DownloadWaitFileReferece});
|
QueryId id = queries_container_.create(Query{file_id, Query::DownloadWaitFileReferece});
|
||||||
node->download_id_ = id;
|
node->download_id_ = id;
|
||||||
if (node->file_source_ids_.empty()) {
|
if (node->download_was_update_file_reference_) {
|
||||||
on_error(id, Status::Error("Can't download file: have no valid file reference and no valid source id"));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (!node->download_may_update_file_reference_) {
|
|
||||||
on_error(id, Status::Error("Can't download file: have valid source id, but do not allowed to use it"));
|
on_error(id, Status::Error("Can't download file: have valid source id, but do not allowed to use it"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
node->download_may_update_file_reference_ = false;
|
node->download_was_update_file_reference_ = true;
|
||||||
|
|
||||||
send_closure(G()->file_reference_manager(), &FileReferenceManager::update_file_reference, file_id,
|
send_closure(G()->file_reference_manager(), &FileReferenceManager::update_file_reference, file_id,
|
||||||
node->file_source_ids_,
|
|
||||||
PromiseCreator::lambda([id, actor_id = actor_id(this), file_id](Result<Unit> res) {
|
PromiseCreator::lambda([id, actor_id = actor_id(this), file_id](Result<Unit> res) {
|
||||||
Status error;
|
Status error;
|
||||||
if (res.is_ok()) {
|
if (res.is_ok()) {
|
||||||
@ -2056,14 +2039,15 @@ void FileManager::run_upload(FileNodePtr node, std::vector<int> bad_parts) {
|
|||||||
CHECK(node->upload_id_ == 0);
|
CHECK(node->upload_id_ == 0);
|
||||||
if (file_view.has_remote_location() && !file_view.has_active_remote_location() &&
|
if (file_view.has_remote_location() && !file_view.has_active_remote_location() &&
|
||||||
file_view.get_type() != FileType::Thumbnail && file_view.get_type() != FileType::EncryptedThumbnail &&
|
file_view.get_type() != FileType::Thumbnail && file_view.get_type() != FileType::EncryptedThumbnail &&
|
||||||
!node->file_source_ids_.empty() && node->upload_may_update_file_reference_) {
|
!node->upload_was_update_file_reference_) {
|
||||||
QueryId id = queries_container_.create(Query{file_id, Query::UploadWaitFileReference});
|
QueryId id = queries_container_.create(Query{file_id, Query::UploadWaitFileReference});
|
||||||
node->upload_id_ = id;
|
node->upload_id_ = id;
|
||||||
node->upload_may_update_file_reference_ = false;
|
node->upload_was_update_file_reference_ = true;
|
||||||
|
|
||||||
send_closure(G()->file_reference_manager(), &FileReferenceManager::update_file_reference, file_id,
|
send_closure(G()->file_reference_manager(), &FileReferenceManager::update_file_reference, file_id,
|
||||||
node->file_source_ids_, PromiseCreator::lambda([id, actor_id = actor_id(this)](Result<Unit> res) {
|
PromiseCreator::lambda([id, actor_id = actor_id(this)](Result<Unit> res) {
|
||||||
send_closure(actor_id, &FileManager::on_error, id, Status::Error("FILE_UPLOAD_RESTART"));
|
send_closure(actor_id, &FileManager::on_error, id,
|
||||||
|
Status::Error("FILE_UPLOAD_RESTART_WITH_FILE_REFERENCE"));
|
||||||
}));
|
}));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -2803,11 +2787,16 @@ void FileManager::on_error_impl(FileNodePtr node, FileManager::Query::Type type,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (status.message() == "FILE_UPLOAD_RESTART") {
|
if (status.message() == "FILE_UPLOAD_RESTART") {
|
||||||
|
if (ends_with(status.message(), "WITH_FILE_REFERENCE")) {
|
||||||
|
node->upload_was_update_file_reference_ = true;
|
||||||
|
}
|
||||||
run_upload(node, {});
|
run_upload(node, {});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (begins_with(status.message(), "FILE_DOWNLOAD_RESTART")) {
|
if (begins_with(status.message(), "FILE_DOWNLOAD_RESTART")) {
|
||||||
if (!ends_with(status.message(), "WITH_FILE_REFERENCE")) {
|
if (ends_with(status.message(), "WITH_FILE_REFERENCE")) {
|
||||||
|
node->download_was_update_file_reference_ = true;
|
||||||
|
} else {
|
||||||
node->can_search_locally_ = false;
|
node->can_search_locally_ = false;
|
||||||
}
|
}
|
||||||
run_download(node);
|
run_download(node);
|
||||||
@ -2863,12 +2852,14 @@ std::pair<FileManager::Query, bool> FileManager::finish_query(QueryId query_id)
|
|||||||
}
|
}
|
||||||
if (node->download_id_ == query_id) {
|
if (node->download_id_ == query_id) {
|
||||||
node->download_id_ = 0;
|
node->download_id_ = 0;
|
||||||
|
node->download_was_update_file_reference_ = false;
|
||||||
node->is_download_started_ = false;
|
node->is_download_started_ = false;
|
||||||
node->set_download_priority(0);
|
node->set_download_priority(0);
|
||||||
was_active = true;
|
was_active = true;
|
||||||
}
|
}
|
||||||
if (node->upload_id_ == query_id) {
|
if (node->upload_id_ == query_id) {
|
||||||
node->upload_id_ = 0;
|
node->upload_id_ = 0;
|
||||||
|
node->upload_was_update_file_reference_ = false;
|
||||||
node->set_upload_priority(0);
|
node->set_upload_priority(0);
|
||||||
was_active = true;
|
was_active = true;
|
||||||
}
|
}
|
||||||
|
@ -81,9 +81,6 @@ class FileNode {
|
|||||||
|
|
||||||
void set_download_offset(int64 download_offset);
|
void set_download_offset(int64 download_offset);
|
||||||
|
|
||||||
void add_file_source(FileSourceId file_source_id);
|
|
||||||
void remove_file_source(FileSourceId file_source_id);
|
|
||||||
|
|
||||||
void on_changed();
|
void on_changed();
|
||||||
void on_info_changed();
|
void on_info_changed();
|
||||||
void on_pmc_changed();
|
void on_pmc_changed();
|
||||||
@ -110,8 +107,6 @@ class FileNode {
|
|||||||
FileLoadManager::QueryId download_id_ = 0;
|
FileLoadManager::QueryId download_id_ = 0;
|
||||||
int64 remote_ready_size_ = 0;
|
int64 remote_ready_size_ = 0;
|
||||||
|
|
||||||
std::vector<FileSourceId> file_source_ids_;
|
|
||||||
|
|
||||||
unique_ptr<FullGenerateFileLocation> generate_;
|
unique_ptr<FullGenerateFileLocation> generate_;
|
||||||
FileLoadManager::QueryId generate_id_ = 0;
|
FileLoadManager::QueryId generate_id_ = 0;
|
||||||
|
|
||||||
@ -151,8 +146,8 @@ class FileNode {
|
|||||||
bool pmc_changed_flag_{false};
|
bool pmc_changed_flag_{false};
|
||||||
bool info_changed_flag_{false};
|
bool info_changed_flag_{false};
|
||||||
|
|
||||||
bool upload_may_update_file_reference_{false};
|
bool upload_was_update_file_reference_{false};
|
||||||
bool download_may_update_file_reference_{false};
|
bool download_was_update_file_reference_{false};
|
||||||
|
|
||||||
void init_ready_size();
|
void init_ready_size();
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user