tdlight/td/telegram/files/ResourceManager.cpp

180 lines
5.1 KiB
C++
Raw Normal View History

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/files/ResourceManager.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/ScopeGuard.h"
#include <algorithm>
namespace td {
void ResourceManager::register_worker(ActorShared<FileLoaderActor> callback, int32 priority) {
auto node_id = nodes_container_.create();
auto *node_ptr = nodes_container_.get(node_id);
*node_ptr = std::make_unique<Node>();
auto *node = (*node_ptr).get();
CHECK(node);
node->node_id = node_id;
node->callback_ = std::move(callback);
add_node(node_id, priority);
send_closure(node->callback_, &FileLoaderActor::set_resource_manager, actor_shared(this, node_id));
}
void ResourceManager::update_priority(int32 priority) {
if (stop_flag_) {
return;
}
auto node_id = get_link_token();
if (!remove_node(node_id)) {
return;
}
add_node(node_id, priority);
}
void ResourceManager::update_resources(const ResourceState &resource_state) {
if (stop_flag_) {
return;
}
auto node_id = get_link_token();
auto node_ptr = nodes_container_.get(node_id);
if (node_ptr == nullptr) {
return;
}
auto node = (*node_ptr).get();
CHECK(node);
VLOG(files) << "before total: " << resource_state_;
VLOG(files) << "before " << tag("node_id", node_id) << ": " << node->resource_state_;
resource_state_ -= node->resource_state_;
node->resource_state_.update_master(resource_state);
resource_state_ += node->resource_state_;
VLOG(files) << "after total: " << resource_state_;
VLOG(files) << "after " << tag("node_id", node_id) << ": " << node->resource_state_;
if (mode_ == Mode::Greedy) {
add_to_heap(node);
}
loop();
}
void ResourceManager::hangup_shared() {
auto node_id = get_link_token();
auto node_ptr = nodes_container_.get(node_id);
if (node_ptr == nullptr) {
return;
}
auto node = (*node_ptr).get();
CHECK(node);
if (node->in_heap()) {
by_estimated_extra_.erase(node->as_heap_node());
}
resource_state_ -= node->resource_state_;
remove_node(node_id);
nodes_container_.erase(node_id);
loop();
}
void ResourceManager::add_to_heap(Node *node) {
auto *heap_node = node->as_heap_node();
auto key = node->resource_state_.estimated_extra();
if (heap_node->in_heap()) {
if (key != 0) {
by_estimated_extra_.fix(key, heap_node);
} else {
by_estimated_extra_.erase(heap_node);
}
} else {
if (key != 0) {
by_estimated_extra_.insert(key, heap_node);
}
}
}
bool ResourceManager::satisfy_node(NodeId file_node_id) {
auto file_node_ptr = nodes_container_.get(file_node_id);
CHECK(file_node_ptr);
auto file_node = (*file_node_ptr).get();
CHECK(file_node);
auto part_size = narrow_cast<int64>(file_node->resource_state_.unit_size());
auto need = file_node->resource_state_.estimated_extra();
VLOG(files) << tag("need", need) << tag("part_size", part_size);
need = (need + part_size - 1) / part_size * part_size;
VLOG(files) << tag("need", need);
if (need == 0) {
return true;
}
auto give = resource_state_.unused();
give = std::min(need, give);
give -= give % part_size;
VLOG(files) << tag("give", give);
if (give == 0) {
return false;
}
resource_state_.start_use(give);
file_node->resource_state_.update_limit(give);
send_closure(file_node->callback_, &FileLoaderActor::update_resources, file_node->resource_state_);
return true;
}
void ResourceManager::loop() {
if (stop_flag_) {
if (nodes_container_.empty()) {
stop();
}
return;
}
auto active_limit = resource_state_.active_limit();
resource_state_.update_limit(2 * 1024 * (1 << 10) - active_limit);
LOG(INFO) << tag("unused", resource_state_.unused());
if (mode_ == Mode::Greedy) {
std::vector<Node *> to_add;
while (!by_estimated_extra_.empty()) {
auto *node = Node::from_heap_node(by_estimated_extra_.pop());
SCOPE_EXIT {
to_add.push_back(node);
};
if (!satisfy_node(node->node_id)) {
break;
}
}
for (auto *node : to_add) {
add_to_heap(node);
}
} else if (mode_ == Mode::Baseline) {
// plain
for (auto &it : to_xload_) {
auto file_node_id = it.second;
if (!satisfy_node(file_node_id)) {
break;
}
}
}
}
void ResourceManager::add_node(NodeId node_id, int32 priority) {
if (priority >= 0) {
auto it = std::find_if(to_xload_.begin(), to_xload_.end(), [&](auto &x) { return x.first <= priority; });
to_xload_.insert(it, std::make_pair(priority, node_id));
} else {
auto it = std::find_if(to_xload_.begin(), to_xload_.end(), [&](auto &x) { return x.first < -priority; });
to_xload_.insert(it, std::make_pair(-priority, node_id));
}
}
bool ResourceManager::remove_node(NodeId node_id) {
auto it = std::find_if(to_xload_.begin(), to_xload_.end(), [&](auto &x) { return x.second == node_id; });
if (it != to_xload_.end()) {
to_xload_.erase(it);
return true;
}
return false;
}
} // namespace td