Pass max_resource_limit as parameter to ResourceManager.

This commit is contained in:
levlam 2022-05-20 16:11:31 +03:00
parent 3886e9e644
commit 0bc767f943
8 changed files with 24 additions and 18 deletions

View File

@ -6,6 +6,7 @@
//
#include "td/telegram/files/FileLoadManager.h"
#include "td/telegram/ConfigShared.h"
#include "td/telegram/Global.h"
#include "td/telegram/net/DcId.h"
#include "td/telegram/TdParameters.h"
@ -22,8 +23,11 @@ FileLoadManager::FileLoadManager(ActorShared<Callback> callback, ActorShared<> p
}
void FileLoadManager::start_up() {
upload_resource_manager_ =
create_actor<ResourceManager>("UploadResourceManager", !G()->parameters().use_file_db /*tdlib_engine*/
if (G()->shared_config().get_option_boolean("is_premium")) {
max_resource_limit_ *= 8;
}
upload_resource_manager_ = create_actor<ResourceManager>("UploadResourceManager", max_resource_limit_,
!G()->parameters().use_file_db /*tdlib_engine*/
? ResourceManager::Mode::Greedy
: ResourceManager::Mode::Baseline);
}
@ -32,7 +36,7 @@ ActorOwn<ResourceManager> &FileLoadManager::get_download_resource_manager(bool i
auto &actor = is_small ? download_small_resource_manager_map_[dc_id] : download_resource_manager_map_[dc_id];
if (actor.empty()) {
actor = create_actor<ResourceManager>(
PSLICE() << "DownloadResourceManager " << tag("is_small", is_small) << tag("dc_id", dc_id),
PSLICE() << "DownloadResourceManager " << tag("is_small", is_small) << tag("dc_id", dc_id), max_resource_limit_,
ResourceManager::Mode::Baseline);
}
return actor;
@ -172,7 +176,7 @@ void FileLoadManager::update_downloaded_part(QueryId id, int64 offset, int64 lim
if (node == nullptr) {
return;
}
send_closure(node->loader_, &FileLoaderActor::update_downloaded_part, offset, limit);
send_closure(node->loader_, &FileLoaderActor::update_downloaded_part, offset, limit, max_resource_limit_);
}
void FileLoadManager::hangup() {

View File

@ -75,6 +75,7 @@ class FileLoadManager final : public Actor {
ActorShared<Callback> callback_;
ActorShared<> parent_;
std::map<QueryId, NodeId> query_id_to_node_id_;
int64 max_resource_limit_ = 1 << 21;
bool stop_flag_ = false;
void start_up() final;

View File

@ -72,12 +72,12 @@ void FileLoader::update_local_file_location(const LocalFileLocation &local) {
loop();
}
void FileLoader::update_downloaded_part(int64 offset, int64 limit) {
void FileLoader::update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) {
if (parts_manager_.get_streaming_offset() != offset) {
auto begin_part_id = parts_manager_.set_streaming_offset(offset, limit);
auto new_end_part_id = limit <= 0 ? parts_manager_.get_part_count()
: narrow_cast<int32>((offset + limit - 1) / parts_manager_.get_part_size()) + 1;
auto max_parts = narrow_cast<int32>(ResourceManager::MAX_RESOURCE_LIMIT / parts_manager_.get_part_size());
auto max_parts = narrow_cast<int32>(max_resource_limit / parts_manager_.get_part_size());
auto end_part_id = begin_part_id + td::min(max_parts, new_end_part_id - begin_part_id);
VLOG(file_loader) << "Protect parts " << begin_part_id << " ... " << end_part_id - 1;
for (auto &it : part_map_) {

View File

@ -38,7 +38,7 @@ class FileLoader : public FileLoaderActor {
void update_resources(const ResourceState &other) final;
void update_local_file_location(const LocalFileLocation &local) final;
void update_downloaded_part(int64 offset, int64 limit) final;
void update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) final;
protected:
void set_ordered_flag(bool flag);

View File

@ -22,10 +22,10 @@ class FileLoaderActor : public NetQueryCallback {
virtual void update_priority(int8 priority) = 0;
virtual void update_resources(const ResourceState &other) = 0;
// TODO: existence of these three functions is a dirty hack. Refactoring is highly appreciated
// TODO: existence of these two functions is a dirty hack. Refactoring is highly appreciated
virtual void update_local_file_location(const LocalFileLocation &local) {
}
virtual void update_downloaded_part(int64 offset, int64 limit) {
virtual void update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) {
}
};

View File

@ -132,7 +132,7 @@ void ResourceManager::loop() {
return;
}
auto active_limit = resource_state_.active_limit();
resource_state_.update_limit(MAX_RESOURCE_LIMIT - active_limit);
resource_state_.update_limit(max_resource_limit_ - active_limit);
LOG(INFO) << tag("unused", resource_state_.unused());
if (mode_ == Mode::Greedy) {

View File

@ -21,7 +21,7 @@ namespace td {
class ResourceManager final : public Actor {
public:
enum class Mode : int32 { Baseline, Greedy };
explicit ResourceManager(Mode mode) : mode_(mode) {
ResourceManager(int64 max_resource_limit, Mode mode) : max_resource_limit_(max_resource_limit), mode_(mode) {
}
// use through ActorShared
void update_priority(int8 priority);
@ -29,10 +29,10 @@ class ResourceManager final : public Actor {
void register_worker(ActorShared<FileLoaderActor> callback, int8 priority);
static constexpr int64 MAX_RESOURCE_LIMIT = 1 << 21;
private:
int64 max_resource_limit_ = 0;
Mode mode_;
using NodeId = uint64;
struct Node final : public HeapNode {
NodeId node_id = 0;

View File

@ -168,9 +168,10 @@ Status NetQueryDispatcher::wait_dc_init(DcId dc_id, bool force) {
int32 slow_net_scheduler_id = G()->get_slow_net_scheduler_id();
auto raw_dc_id = dc_id.get_raw_id();
int32 upload_session_count = raw_dc_id != 2 && raw_dc_id != 4 ? 8 : 4;
int32 download_session_count = 2;
int32 download_small_session_count = 2;
bool is_premium = G()->shared_config().get_option_boolean("is_premium");
int32 upload_session_count = (raw_dc_id != 2 && raw_dc_id != 4) || is_premium ? 8 : 4;
int32 download_session_count = is_premium ? 8 : 2;
int32 download_small_session_count = is_premium ? 8 : 2;
dc.main_session_ = create_actor<SessionMultiProxy>(PSLICE() << "SessionMultiProxy:" << raw_dc_id << ":main",
session_count, auth_data, raw_dc_id == main_dc_id_, use_pfs,
false, false, is_cdn, need_destroy_key);