183 lines
5.2 KiB
C++
183 lines
5.2 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#include "td/telegram/files/ResourceManager.h"
|
|
|
|
#include "td/telegram/files/FileLoaderUtils.h"
|
|
|
|
#include "td/utils/common.h"
|
|
#include "td/utils/format.h"
|
|
#include "td/utils/logging.h"
|
|
#include "td/utils/misc.h"
|
|
#include "td/utils/ScopeGuard.h"
|
|
|
|
#include <algorithm>
|
|
|
|
namespace td {
|
|
|
|
void ResourceManager::register_worker(ActorShared<FileLoaderActor> callback, int8 priority) {
|
|
auto node_id = nodes_container_.create();
|
|
auto *node_ptr = nodes_container_.get(node_id);
|
|
*node_ptr = make_unique<Node>();
|
|
auto *node = (*node_ptr).get();
|
|
CHECK(node);
|
|
node->node_id = node_id;
|
|
node->callback_ = std::move(callback);
|
|
|
|
add_node(node_id, priority);
|
|
send_closure(node->callback_, &FileLoaderActor::set_resource_manager, actor_shared(this, node_id));
|
|
}
|
|
|
|
void ResourceManager::update_priority(int8 priority) {
|
|
if (stop_flag_) {
|
|
return;
|
|
}
|
|
auto node_id = get_link_token();
|
|
if (!remove_node(node_id)) {
|
|
return;
|
|
}
|
|
add_node(node_id, priority);
|
|
}
|
|
|
|
void ResourceManager::update_resources(const ResourceState &resource_state) {
|
|
if (stop_flag_) {
|
|
return;
|
|
}
|
|
auto node_id = get_link_token();
|
|
auto node_ptr = nodes_container_.get(node_id);
|
|
if (node_ptr == nullptr) {
|
|
return;
|
|
}
|
|
auto node = (*node_ptr).get();
|
|
CHECK(node);
|
|
VLOG(file_loader) << "Before total: " << resource_state_ << "; node " << node_id << ": " << node->resource_state_;
|
|
resource_state_ -= node->resource_state_;
|
|
node->resource_state_.update_master(resource_state);
|
|
resource_state_ += node->resource_state_;
|
|
VLOG(file_loader) << "After total: " << resource_state_ << "; node " << node_id << ": " << node->resource_state_;
|
|
|
|
if (mode_ == Mode::Greedy) {
|
|
add_to_heap(node);
|
|
}
|
|
loop();
|
|
}
|
|
|
|
void ResourceManager::hangup_shared() {
|
|
auto node_id = get_link_token();
|
|
auto node_ptr = nodes_container_.get(node_id);
|
|
if (node_ptr == nullptr) {
|
|
return;
|
|
}
|
|
auto node = (*node_ptr).get();
|
|
CHECK(node);
|
|
if (node->in_heap()) {
|
|
by_estimated_extra_.erase(node->as_heap_node());
|
|
}
|
|
resource_state_ -= node->resource_state_;
|
|
remove_node(node_id);
|
|
nodes_container_.erase(node_id);
|
|
loop();
|
|
}
|
|
|
|
void ResourceManager::add_to_heap(Node *node) {
|
|
auto *heap_node = node->as_heap_node();
|
|
auto key = node->resource_state_.estimated_extra();
|
|
if (heap_node->in_heap()) {
|
|
if (key != 0) {
|
|
by_estimated_extra_.fix(key, heap_node);
|
|
} else {
|
|
by_estimated_extra_.erase(heap_node);
|
|
}
|
|
} else {
|
|
if (key != 0) {
|
|
by_estimated_extra_.insert(key, heap_node);
|
|
}
|
|
}
|
|
}
|
|
|
|
bool ResourceManager::satisfy_node(NodeId file_node_id) {
|
|
auto file_node_ptr = nodes_container_.get(file_node_id);
|
|
CHECK(file_node_ptr);
|
|
auto file_node = (*file_node_ptr).get();
|
|
CHECK(file_node);
|
|
auto part_size = narrow_cast<int64>(file_node->resource_state_.unit_size());
|
|
auto need = file_node->resource_state_.estimated_extra();
|
|
VLOG(file_loader) << tag("need", need) << tag("part_size", part_size);
|
|
need = (need + part_size - 1) / part_size * part_size;
|
|
VLOG(file_loader) << tag("need", need);
|
|
if (need == 0) {
|
|
return true;
|
|
}
|
|
auto give = resource_state_.unused();
|
|
give = min(need, give);
|
|
give -= give % part_size;
|
|
VLOG(file_loader) << tag("give", give);
|
|
if (give == 0) {
|
|
return false;
|
|
}
|
|
resource_state_.start_use(give);
|
|
file_node->resource_state_.update_limit(give);
|
|
send_closure(file_node->callback_, &FileLoaderActor::update_resources, file_node->resource_state_);
|
|
return true;
|
|
}
|
|
|
|
void ResourceManager::loop() {
|
|
if (stop_flag_) {
|
|
if (nodes_container_.empty()) {
|
|
stop();
|
|
}
|
|
return;
|
|
}
|
|
auto active_limit = resource_state_.active_limit();
|
|
resource_state_.update_limit(MAX_RESOURCE_LIMIT - active_limit);
|
|
LOG(INFO) << tag("unused", resource_state_.unused());
|
|
|
|
if (mode_ == Mode::Greedy) {
|
|
std::vector<Node *> to_add;
|
|
while (!by_estimated_extra_.empty()) {
|
|
auto *node = Node::from_heap_node(by_estimated_extra_.pop());
|
|
SCOPE_EXIT {
|
|
to_add.push_back(node);
|
|
};
|
|
if (!satisfy_node(node->node_id)) {
|
|
break;
|
|
}
|
|
}
|
|
for (auto *node : to_add) {
|
|
add_to_heap(node);
|
|
}
|
|
} else if (mode_ == Mode::Baseline) {
|
|
// plain
|
|
for (auto &it : to_xload_) {
|
|
auto file_node_id = it.second;
|
|
if (!satisfy_node(file_node_id)) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void ResourceManager::add_node(NodeId node_id, int8 priority) {
|
|
if (priority >= 0) {
|
|
auto it = std::find_if(to_xload_.begin(), to_xload_.end(), [&](auto &x) { return x.first <= priority; });
|
|
to_xload_.insert(it, std::make_pair(priority, node_id));
|
|
} else {
|
|
auto it = std::find_if(to_xload_.begin(), to_xload_.end(), [&](auto &x) { return x.first < -priority; });
|
|
to_xload_.insert(it, std::make_pair(narrow_cast<int8>(-priority), node_id));
|
|
}
|
|
}
|
|
|
|
bool ResourceManager::remove_node(NodeId node_id) {
|
|
auto it = std::find_if(to_xload_.begin(), to_xload_.end(), [&](auto &x) { return x.second == node_id; });
|
|
if (it != to_xload_.end()) {
|
|
to_xload_.erase(it);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
} // namespace td
|