Use WaitFreeHashMap to store nodes in FileReferenceManager.

This commit is contained in:
levlam 2022-11-17 19:13:36 +03:00
parent f085e7eea3
commit 5081ef4c2a
2 changed files with 33 additions and 21 deletions

View File

@ -37,7 +37,7 @@ FileReferenceManager::FileReferenceManager(ActorShared<> parent) : parent_(std::
}
FileReferenceManager::~FileReferenceManager() {
Scheduler::instance()->destroy_on_scheduler(G()->get_gc_scheduler_id(), file_sources_);
Scheduler::instance()->destroy_on_scheduler(G()->get_gc_scheduler_id(), file_sources_, nodes_);
}
void FileReferenceManager::tear_down() {
@ -155,16 +155,26 @@ FileSourceId FileReferenceManager::create_attach_menu_bot_file_source(UserId use
return add_file_source_id(source, PSLICE() << "attachment menu bot " << user_id);
}
bool FileReferenceManager::add_file_source(NodeId node_id, FileSourceId file_source_id) {
FileReferenceManager::Node &FileReferenceManager::add_node(NodeId node_id) {
CHECK(node_id.is_valid());
bool is_added = nodes_[node_id].file_source_ids.add(file_source_id);
auto &node = nodes_[node_id];
if (node == nullptr) {
node = make_unique<Node>();
}
return *node;
}
bool FileReferenceManager::add_file_source(NodeId node_id, FileSourceId file_source_id) {
auto &node = add_node(node_id);
bool is_added = node.file_source_ids.add(file_source_id);
VLOG(file_references) << "Add " << (is_added ? "new" : "old") << ' ' << file_source_id << " for file " << node_id;
return is_added;
}
bool FileReferenceManager::remove_file_source(NodeId node_id, FileSourceId file_source_id) {
CHECK(node_id.is_valid());
bool is_removed = nodes_[node_id].file_source_ids.remove(file_source_id);
auto *node = nodes_.get_pointer(node_id);
bool is_removed = node != nullptr && node->file_source_ids.remove(file_source_id);
if (is_removed) {
VLOG(file_references) << "Remove " << file_source_id << " from file " << node_id;
} else {
@ -174,11 +184,11 @@ bool FileReferenceManager::remove_file_source(NodeId node_id, FileSourceId file_
}
vector<FileSourceId> FileReferenceManager::get_some_file_sources(NodeId node_id) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
auto *node = nodes_.get_pointer(node_id);
if (node == nullptr) {
return {};
}
return it->second.file_source_ids.get_some_elements();
return node->file_source_ids.get_some_elements();
}
vector<FullMessageId> FileReferenceManager::get_some_message_file_sources(NodeId node_id) {
@ -197,14 +207,13 @@ vector<FullMessageId> FileReferenceManager::get_some_message_file_sources(NodeId
}
void FileReferenceManager::merge(NodeId to_node_id, NodeId from_node_id) {
auto from_it = nodes_.find(from_node_id);
if (from_it == nodes_.end()) {
auto *from_node_ptr = nodes_.get_pointer(from_node_id);
if (from_node_ptr == nullptr) {
return;
}
auto &from = *from_node_ptr;
CHECK(to_node_id.is_valid());
auto &to = nodes_[to_node_id];
auto &from = from_it->second;
auto &to = add_node(to_node_id);
VLOG(file_references) << "Merge " << to.file_source_ids.size() << " and " << from.file_source_ids.size()
<< " sources of files " << to_node_id << " and " << from_node_id;
CHECK(!to.query || to.query->proxy.is_empty());
@ -227,7 +236,11 @@ void FileReferenceManager::merge(NodeId to_node_id, NodeId from_node_id) {
void FileReferenceManager::run_node(NodeId node_id) {
CHECK(node_id.is_valid());
Node &node = nodes_[node_id];
auto *node_ptr = nodes_.get_pointer(node_id);
if (node_ptr == nullptr) {
return;
}
Node &node = *node_ptr;
if (!node.query) {
return;
}
@ -266,8 +279,7 @@ void FileReferenceManager::run_node(NodeId node_id) {
void FileReferenceManager::send_query(Destination dest, FileSourceId file_source_id) {
VLOG(file_references) << "Send file reference repair query for file " << dest.node_id << " with generation "
<< dest.generation << " from " << file_source_id;
CHECK(dest.node_id.is_valid());
auto &node = nodes_[dest.node_id];
auto &node = add_node(dest.node_id);
node.query->active_queries++;
auto promise = PromiseCreator::lambda([dest, file_source_id, actor_id = actor_id(this),
@ -361,8 +373,7 @@ FileReferenceManager::Destination FileReferenceManager::on_query_result(Destinat
VLOG(file_references) << "Receive result of file reference repair query for file " << dest.node_id
<< " with generation " << dest.generation << " from " << file_source_id << ": " << status << " "
<< sub;
CHECK(dest.node_id.is_valid());
auto &node = nodes_[dest.node_id];
auto &node = add_node(dest.node_id);
auto query = node.query.get();
if (!query) {
@ -399,8 +410,7 @@ void FileReferenceManager::repair_file_reference(NodeId node_id, Promise<> promi
auto main_file_id = G()->td().get_actor_unsafe()->file_manager_->get_file_view(node_id).get_main_file_id();
VLOG(file_references) << "Repair file reference for file " << node_id << "/" << main_file_id;
node_id = main_file_id;
CHECK(node_id.is_valid());
auto &node = nodes_[node_id];
auto &node = add_node(node_id);
if (!node.query) {
node.query = make_unique<Query>();
node.query->generation = ++query_generation_;

View File

@ -20,12 +20,12 @@
#include "td/actor/actor.h"
#include "td/utils/common.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/logging.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/Variant.h"
#include "td/utils/WaitFreeHashMap.h"
#include "td/utils/WaitFreeVector.h"
namespace td {
@ -175,10 +175,12 @@ class FileReferenceManager final : public Actor {
int64 query_generation_{0};
FlatHashMap<NodeId, Node, FileIdHash> nodes_;
WaitFreeHashMap<NodeId, unique_ptr<Node>, FileIdHash> nodes_;
ActorShared<> parent_;
Node &add_node(NodeId node_id);
void run_node(NodeId node);
void send_query(Destination dest, FileSourceId file_source_id);
Destination on_query_result(Destination dest, FileSourceId file_source_id, Status status, int32 sub = 0);