DownloadManager fixes.

This commit is contained in:
levlam 2022-02-28 18:25:07 +03:00
parent 08675f2df1
commit 1f300c50e0
8 changed files with 162 additions and 121 deletions

View File

@ -17,19 +17,25 @@
#include "td/utils/algorithm.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/Hints.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/tl_helpers.h"
#include <algorithm>
#include <functional>
#include <limits>
namespace td {
struct FileDownloadInDb {
int64 download_id{};
string unique_file_id;
FileSourceId file_source_id;
string search_text;
int priority{};
int created_at{};
int completed_at{};
int32 priority{};
int32 created_at{};
int32 completed_at{};
bool is_paused{};
template <class StorerT>
@ -68,25 +74,8 @@ class DownloadManagerImpl final : public DownloadManager {
Status toggle_is_paused(FileId file_id, bool is_paused) final {
TRY_STATUS(check_is_active());
TRY_RESULT(file_info_ptr, get_file_info(file_id));
auto &file_info = *file_info_ptr;
if (!is_active(file_info)) {
return Status::Error(400, "File is already downloaded");
}
if (is_paused != file_info.is_paused) {
with_file_info(file_info, [&](auto &file_info) {
file_info.is_paused = is_paused;
file_info.need_save_to_db = true;
});
if (is_paused) {
callback_->pause_file(file_info.internal_file_id);
} else {
callback_->start_file(file_info.internal_file_id, file_info.priority);
}
}
toggle_is_paused(*file_info_ptr, is_paused);
return Status::OK();
}
@ -94,7 +83,7 @@ class DownloadManagerImpl final : public DownloadManager {
TRY_STATUS(check_is_active());
for (auto &it : files_) {
toggle_is_paused(it.second.file_id, is_paused);
toggle_is_paused(it.second, is_paused);
}
return Status::OK();
@ -107,13 +96,13 @@ class DownloadManagerImpl final : public DownloadManager {
if (!file_info.is_paused) {
callback_->pause_file(file_info.internal_file_id);
}
unregister_file_info(file_info);
if (delete_from_cache) {
callback_->delete_file(file_info.internal_file_id);
}
by_internal_file_id_.erase(file_info.internal_file_id);
by_file_id_.erase(file_info.file_id);
hints_.remove(file_info.download_id);
unregister_file_info(file_info);
remove_from_db(file_info);
files_.erase(file_info.download_id);
@ -130,7 +119,7 @@ class DownloadManagerImpl final : public DownloadManager {
Status remove_all_files(bool only_active, bool only_completed, bool delete_from_cache) final {
TRY_STATUS(check_is_active());
std::vector<FileId> to_remove;
vector<FileId> to_remove;
for (auto &it : files_) {
FileInfo &file_info = it.second;
if (only_active && !is_active(file_info)) {
@ -150,24 +139,24 @@ class DownloadManagerImpl final : public DownloadManager {
Status add_file(FileId file_id, FileSourceId file_source_id, string search_by, int8 priority) final {
TRY_STATUS(check_is_active());
// TODO: maybe we should ignore this query in some cases
remove_file(file_id, {}, false);
auto download_id = next_download_id();
FileInfo file_info;
file_info.download_id = next_download_id();
file_info.download_id = download_id;
file_info.file_id = file_id;
file_info.internal_file_id = callback_->dup_file_id(file_id);
file_info.file_source_id = file_source_id;
file_info.is_paused = false;
file_info.priority = priority;
file_info.created_at = static_cast<int32>(G()->server_time());
by_internal_file_id_[file_info.internal_file_id] = file_info.download_id;
by_file_id_[file_info.file_id] = file_info.download_id;
hints_.add(file_info.download_id, search_by);
auto download_id = file_info.download_id;
file_info.created_at = G()->unix_time();
file_info.need_save_to_db = true;
auto it = files_.emplace(download_id, file_info).first;
by_internal_file_id_[file_info.internal_file_id] = download_id;
by_file_id_[file_info.file_id] = download_id;
hints_.add(download_id, search_by);
auto it = files_.emplace(download_id, std::move(file_info)).first;
register_file_info(it->second);
callback_->start_file(it->second.internal_file_id, it->second.priority);
return Status::OK();
@ -176,6 +165,9 @@ class DownloadManagerImpl final : public DownloadManager {
void search(string query, bool only_active, bool only_completed, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundFileDownloads>> promise) final {
TRY_STATUS_PROMISE(promise, check_is_active());
if (limit <= 0) {
return promise.set_error(Status::Error(400, "Limit must be positive"));
}
int64 offset_int64 = std::numeric_limits<int64>::max();
if (!offset.empty()) {
auto r_offset = to_integer_safe<int64>(offset);
@ -184,12 +176,11 @@ class DownloadManagerImpl final : public DownloadManager {
}
offset_int64 = r_offset.move_as_ok();
}
auto ids = query.empty() ? transform(files_, [](auto &it) { return it.first; }) : hints_.search(query, 200).second;
remove_if(ids, [&](auto download_id) {
auto ids =
query.empty() ? transform(files_, [](auto &it) { return it.first; }) : hints_.search(query, 10000).second;
td::remove_if(ids, [&](auto download_id) {
auto r = get_file_info(download_id);
if (r.is_error()) {
return true;
}
CHECK(r.is_ok());
auto &file_info = *r.ok();
if (only_active && !is_active(file_info)) {
return true;
@ -197,27 +188,28 @@ class DownloadManagerImpl final : public DownloadManager {
if (only_completed && is_active(file_info)) {
return true;
}
if (download_id >= offset_int64) {
return true;
}
return false;
});
std::sort(ids.begin(), ids.end(), std::greater<>());
int32 total_count = narrow_cast<int32>(ids.size());
remove_if(ids, [offset_int64](auto id) { return id < offset_int64; });
if (static_cast<int32>(ids.size()) > limit) {
ids.resize(max(0, limit));
auto total_count = narrow_cast<int32>(ids.size());
if (total_count > limit) {
ids.resize(limit);
}
int64 last_id = 0;
auto file_downloads = transform(ids, [&](auto id) {
last_id = id;
FileInfo &file_info = files_[id];
return callback_->get_file_download_object(file_info.file_id, file_info.file_source_id, file_info.created_at,
file_info.completed_at, file_info.is_paused);
});
td::remove_if(file_downloads, [](const auto &file_download) { return file_download->message_ == nullptr; });
string next_offset;
if (last_id != 0) {
next_offset = to_string(last_id);
if (!ids.empty()) {
next_offset = to_string(ids.back());
}
return promise.set_value(td_api::make_object<td_api::foundFileDownloads>(total_count, Auto(), next_offset));
promise.set_value(
td_api::make_object<td_api::foundFileDownloads>(total_count, std::move(file_downloads), next_offset));
}
void update_file_download_state(FileId internal_file_id, int64 download_size, int64 size, bool is_paused) final {
@ -257,23 +249,22 @@ class DownloadManagerImpl final : public DownloadManager {
unique_ptr<Callback> callback_;
struct FileInfo {
int64 download_id{};
FileId file_id;
FileId internal_file_id;
FileSourceId file_source_id;
int8 priority;
bool is_paused{};
FileId file_id{};
FileId internal_file_id{};
FileSourceId file_source_id{};
mutable bool need_save_to_db{};
int64 size{};
int64 downloaded_size{};
int32 created_at{};
int32 completed_at{};
mutable bool need_save_to_db{false};
};
FlatHashMap<FileId, int64, FileIdHash> by_file_id_;
FlatHashMap<FileId, int64, FileIdHash> by_internal_file_id_;
FlatHashMap<int64, FileInfo> files_;
size_t active_files_count_{0};
// size_t active_file_count_{};
Hints hints_;
Counters counters_;
@ -283,36 +274,38 @@ class DownloadManagerImpl final : public DownloadManager {
int64 max_download_id_{0};
int64 next_download_id() {
auto res = ++max_download_id_;
G()->td_db()->get_binlog_pmc()->set("dlds_max_id", to_string(res));
return res;
return ++max_download_id_;
}
bool is_active(const FileInfo &file_info) const {
static bool is_active(const FileInfo &file_info) {
return file_info.size == 0 || file_info.downloaded_size != file_info.size;
}
static std::string pmc_key(const FileInfo &file_info) {
return PSTRING() << "dlds" << file_info.download_id;
static string pmc_key(const FileInfo &file_info) {
return PSTRING() << "dlds#" << file_info.download_id;
}
void sync_with_db(const FileInfo &file_info) {
if (!file_info.need_save_to_db) {
return;
}
file_info.need_save_to_db = false;
FileDownloadInDb to_save;
to_save.download_id = file_info.download_id;
to_save.file_source_id = file_info.file_source_id;
to_save.is_paused = file_info.is_paused;
to_save.priority = file_info.priority;
to_save.created_at = file_info.created_at;
to_save.completed_at = file_info.completed_at;
to_save.unique_file_id = callback_->get_unique_file_id(file_info.file_id);
G()->td_db()->get_binlog_pmc()->set(pmc_key(file_info), log_event_store(to_save).as_slice().str());
}
void remove_from_db(const FileInfo &file_info) {
static void remove_from_db(const FileInfo &file_info) {
G()->td_db()->get_binlog_pmc()->erase(pmc_key(file_info));
}
void on_synchronized(Result<Unit>) {
LOG(ERROR) << "DownloadManager: synchornized";
void on_synchronized(Result<Unit> result) {
LOG(INFO) << "DownloadManager: synchronized";
is_synchonized_ = true;
update_counters();
}
@ -323,58 +316,68 @@ class DownloadManagerImpl final : public DownloadManager {
}
is_started_ = true;
auto serialized_counter = G()->td_db()->get_binlog_pmc()->get("dlds_counter");
Counters counters_from_db;
if (!serialized_counter.empty()) {
log_event_parse(counters_from_db, serialized_counter).ensure();
log_event_parse(sent_counters_, serialized_counter).ensure();
callback_->update_counters(sent_counters_);
}
callback_->update_counters(counters_from_db);
max_download_id_ = to_integer<int64>(G()->td_db()->get_binlog_pmc()->get("dlds_max_id"));
auto downloads_in_kv = G()->td_db()->get_binlog_pmc()->prefix_get("dlds#");
MultiPromiseActorSafe mtps("DownloadManager: initialization");
mtps.add_promise(promise_send_closure(actor_id(this), &DownloadManagerImpl::on_synchronized));
MultiPromiseActorSafe mpas("DownloadManager: initialization");
mpas.add_promise(promise_send_closure(actor_id(this), &DownloadManagerImpl::on_synchronized));
mpas.set_ignore_errors(true);
auto lock = mpas.get_promise();
for (auto &it : downloads_in_kv) {
Slice key = it.first;
Slice value = it.second;
FileDownloadInDb in_db;
log_event_parse(in_db, value).ensure();
in_db.download_id = to_integer_safe<int64>(key.substr(4)).move_as_ok();
auto promise = mtps.get_promise();
CHECK(in_db.download_id == to_integer_safe<int64>(key).ok());
max_download_id_ = max(in_db.download_id, max_download_id_);
auto promise = mpas.get_promise();
auto unique_file_id = std::move(in_db.unique_file_id);
auto file_source_id = in_db.file_source_id;
auto new_promise = [self = actor_id(this), in_db = std::move(in_db),
auto new_promise = [actor_id = actor_id(this), in_db = std::move(in_db),
promise = std::move(promise)](Result<FileSearchInfo> res) mutable {
if (res.is_error()) {
promise.set_value({});
return;
return promise.set_value(Unit());
}
send_closure(self, &DownloadManagerImpl::add_file_from_db, res.move_as_ok(), std::move(in_db));
promise.set_value({});
send_closure(actor_id, &DownloadManagerImpl::add_file_from_db, res.move_as_ok(), std::move(in_db),
std::move(promise));
};
send_closure(G()->file_reference_manager(), &FileReferenceManager::get_file_search_info, file_source_id,
std::move(unique_file_id), std::move(new_promise));
}
lock.set_value(Unit());
}
void add_file_from_db(FileSearchInfo file_search_info, FileDownloadInDb in_db) {
void add_file_from_db(FileSearchInfo file_search_info, FileDownloadInDb in_db, Promise<Unit> promise) {
if (by_file_id_.count(file_search_info.file_id) != 0) {
// file has already been added
return promise.set_value(Unit());
}
auto download_id = in_db.download_id;
FileInfo file_info;
file_info.download_id = in_db.download_id;
file_info.download_id = download_id;
file_info.file_id = file_search_info.file_id;
file_info.internal_file_id = callback_->dup_file_id(file_info.file_id);
file_info.file_source_id = in_db.file_source_id;
file_info.is_paused = false;
file_info.is_paused = in_db.is_paused;
file_info.priority = narrow_cast<int8>(in_db.priority);
file_info.completed_at = in_db.completed_at;
file_info.created_at = in_db.created_at;
by_internal_file_id_[file_info.internal_file_id] = file_info.download_id;
by_file_id_[file_info.file_id] = file_info.download_id;
hints_.add(file_info.download_id, file_search_info.search_text);
by_internal_file_id_[file_info.internal_file_id] = download_id;
by_file_id_[file_info.file_id] = download_id;
hints_.add(download_id, file_search_info.search_text);
auto download_id = file_info.download_id;
auto it = files_.emplace(download_id, file_info).first;
register_file_info(it->second);
callback_->start_file(it->second.internal_file_id, it->second.priority);
if (file_info.completed_at > 0) {
// file must be removed from the list if it isn't fully downloaded
} else {
auto it = files_.emplace(download_id, file_info).first;
register_file_info(it->second);
callback_->start_file(it->second.internal_file_id, it->second.priority);
promise.set_value(Unit());
}
}
void loop() final {
@ -388,6 +391,22 @@ class DownloadManagerImpl final : public DownloadManager {
callback_.reset();
}
void toggle_is_paused(const FileInfo &file_info, bool is_paused) {
if (!is_active(file_info) || is_paused == file_info.is_paused) {
return;
}
with_file_info(file_info, [&](auto &file_info) {
file_info.is_paused = is_paused;
file_info.need_save_to_db = true;
});
if (is_paused) {
callback_->pause_file(file_info.internal_file_id);
} else {
callback_->start_file(file_info.internal_file_id, file_info.priority);
}
}
void update_counters() {
if (!is_synchonized_) {
return;
@ -427,17 +446,19 @@ class DownloadManagerImpl final : public DownloadManager {
}
void unregister_file_info(const FileInfo &file_info) {
active_files_count_ -= is_active(file_info);
// active_file_count_ -= is_active(file_info);
counters_.downloaded_size -= file_info.downloaded_size;
counters_.total_size -= file_info.size;
counters_.total_size -= max(file_info.downloaded_size, file_info.size);
counters_.total_count--;
}
void register_file_info(FileInfo &file_info) {
active_files_count_ += is_active(file_info);
// active_file_count_ += is_active(file_info);
counters_.downloaded_size += file_info.downloaded_size;
counters_.total_size += file_info.size;
counters_.total_size += max(file_info.downloaded_size, file_info.size);
counters_.total_count++;
if (!is_active(file_info) && file_info.completed_at == 0) {
file_info.completed_at = static_cast<int32>(G()->server_time());
file_info.completed_at = G()->unix_time();
file_info.need_save_to_db = true;
}
sync_with_db(file_info);
@ -454,7 +475,8 @@ class DownloadManagerImpl final : public DownloadManager {
Status check_is_active() {
if (!callback_) {
return Status::Error("TODO: code and message`");
LOG(ERROR) << "DownloadManager wasn't initialized";
return Status::Error(500, "DownloadManager isn't initialized");
}
return Status::OK();
}

View File

@ -15,13 +15,12 @@
#include "td/utils/common.h"
#include "td/utils/Status.h"
#include "td/utils/tl_helpers.h"
namespace td {
class DownloadManager : public Actor {
public:
// creates, but do not stats the actor
// creates, but do not starts the actor
static unique_ptr<DownloadManager> create();
struct Counters {
@ -43,7 +42,7 @@ class DownloadManager : public Actor {
void parse(ParserT &parser);
};
// Trying to make DownloadManager testable, so all interactions with G() will be hidden is this probably monstrous interface
// trying to make DownloadManager testable, so all interactions with G() will be hidden is this probably monstrous interface
class Callback {
public:
virtual ~Callback() = default;
@ -67,21 +66,21 @@ class DownloadManager : public Actor {
// sets callback to handle all updates
virtual void set_callback(unique_ptr<Callback> callback) = 0;
virtual Status toggle_is_paused(FileId, bool is_paused) = 0;
virtual Status toggle_all_is_paused(bool is_paused) = 0;
virtual Status remove_file(FileId file_id, FileSourceId file_source_id, bool delete_from_cache) = 0;
virtual Status change_search_text(FileId file_id, FileSourceId file_source_id, string search_by) = 0;
virtual Status remove_all_files(bool only_active, bool only_completed, bool delete_from_cache) = 0;
// Files are always added in is_paused = false state
// files are always added in is_paused = false state
virtual Status add_file(FileId file_id, FileSourceId file_source_id, string search_by, int8 priority) = 0;
virtual void search(std::string query, bool only_active, bool only_completed, string offset, int32 limit,
virtual Status change_search_text(FileId file_id, FileSourceId file_source_id, string search_by) = 0;
virtual Status toggle_is_paused(FileId file_id, bool is_paused) = 0;
virtual Status toggle_all_is_paused(bool is_paused) = 0;
virtual void search(string query, bool only_active, bool only_completed, string offset, int32 limit,
Promise<td_api::object_ptr<td_api::foundFileDownloads>> promise) = 0;
virtual Status remove_file(FileId file_id, FileSourceId file_source_id, bool delete_from_cache) = 0;
virtual Status remove_all_files(bool only_active, bool only_completed, bool delete_from_cache) = 0;
//
// private interface to handle all kinds of updates
//
virtual void update_file_download_state(FileId file_id, int64 download_size, int64 size, bool is_paused) = 0;
virtual void update_file_deleted(FileId file_id) = 0;
virtual void update_file_download_state(FileId internal_file_id, int64 download_size, int64 size, bool is_paused) = 0;
virtual void update_file_deleted(FileId internal_file_id) = 0;
};
} // namespace td

View File

@ -22880,7 +22880,9 @@ void MessagesManager::remove_message_file_sources(DialogId dialog_id, const Mess
auto file_source_id = get_message_file_source_id(FullMessageId(dialog_id, m->message_id));
if (file_source_id.is_valid()) {
for (auto file_id : file_ids) {
send_closure(td_->download_manager_actor_, &DownloadManager::remove_file, file_id, file_source_id, false);
auto file_view = td_->file_manager_->get_file_view(file_id);
send_closure(td_->download_manager_actor_, &DownloadManager::remove_file, file_view.file_id(), file_source_id,
false);
td_->file_manager_->remove_file_source(file_id, file_source_id);
}
}
@ -22905,7 +22907,9 @@ void MessagesManager::change_message_files(DialogId dialog_id, const Message *m,
send_closure(G()->file_manager(), &FileManager::delete_file, file_id, Promise<>(), "change_message_files");
}
if (file_source_id.is_valid()) {
send_closure(td_->download_manager_actor_, &DownloadManager::remove_file, file_id, file_source_id, false);
auto file_view = td_->file_manager_->get_file_view(file_id);
send_closure(td_->download_manager_actor_, &DownloadManager::remove_file, file_view.file_id(), file_source_id,
false);
}
}
}
@ -35805,8 +35809,9 @@ bool MessagesManager::update_message_content(DialogId dialog_id, Message *old_me
if (file_source_id.is_valid()) {
auto search_text = get_message_search_text(old_message);
for (auto file_id : file_ids) {
send_closure(td_->download_manager_actor_, &DownloadManager::change_search_text, file_id, file_source_id,
search_text);
auto file_view = td_->file_manager_->get_file_view(file_id);
send_closure(td_->download_manager_actor_, &DownloadManager::change_search_text, file_view.file_id(),
file_source_id, search_text);
}
}
}
@ -39824,7 +39829,20 @@ void MessagesManager::add_message_file_to_downloads(FullMessageId full_message_i
if (m == nullptr) {
return promise.set_error(Status::Error(400, "Message not found"));
}
if (!contains(get_message_file_ids(m), file_id)) {
auto file_view = td_->file_manager_->get_file_view(file_id);
if (file_view.empty()) {
return promise.set_error(Status::Error(400, "File not found"));
}
file_id = file_view.file_id();
bool is_found = false;
for (auto message_file_id : get_message_file_ids(m)) {
auto message_file_view = td_->file_manager_->get_file_view(message_file_id);
CHECK(!message_file_view.empty());
if (message_file_view.file_id() == file_id) {
is_found = true;
}
}
if (!is_found) {
return promise.set_error(Status::Error(400, "Message has no specified file"));
}
auto search_text = get_message_search_text(m);

View File

@ -3989,8 +3989,8 @@ void Td::init_managers() {
FileManager::KEEP_DOWNLOAD_LIMIT);
}
void delete_file(FileId file_id) final {
send_closure(
G()->file_manager(), &FileManager::delete_file, file_id, [](Result<Unit>) {}, "download manager callback");
send_closure(G()->file_manager(), &FileManager::delete_file, file_id, Promise<Unit>(),
"download manager callback");
}
FileId dup_file_id(FileId file_id) final {
auto td = G()->td().get_actor_unsafe();
@ -4015,14 +4015,15 @@ void Td::init_managers() {
std::shared_ptr<FileManager::DownloadCallback> download_file_callback_;
std::shared_ptr<FileManager::DownloadCallback> make_download_file_callback() {
if (!download_file_callback_) {
class Impl : public FileManager::DownloadCallback {
class Impl final : public FileManager::DownloadCallback {
public:
Impl(ActorId<DownloadManager> download_manager) : download_manager_(download_manager) {
explicit Impl(ActorId<DownloadManager> download_manager) : download_manager_(download_manager) {
}
void on_progress(FileId file_id) final {
auto view = G()->file_manager().get_actor_unsafe()->get_file_view(file_id);
auto td = G()->td().get_actor_unsafe();
auto file_view = td->file_manager_->get_file_view(file_id);
send_closure(download_manager_, &DownloadManager::update_file_download_state, file_id,
view.local_total_size(), view.size(), !view.is_downloading());
file_view.local_total_size(), file_view.size(), !file_view.is_downloading());
// TODO: handle deleted state?
}
void on_download_ok(FileId file_id) final {

View File

@ -207,9 +207,9 @@ void FileNode::update_effective_download_limit(int64 old_download_limit) {
return;
}
// Should be no false positives here
// There should be no false positives here
// When we use IGNORE_DOWNLOAD_LIMIT, set_download_limit will be ignored
// And in case we turn off ignore_download_limit, set_download_limit will not change effective downoad limit
// And in case we turn off ignore_download_limit, set_download_limit will not change effective download limit
VLOG(update_file) << "File " << main_file_id_ << " has changed download_limit from " << old_download_limit << " to "
<< get_download_limit() << " (limit=" << private_download_limit_
<< ";ignore=" << ignore_download_limit_ << ")";
@ -1835,7 +1835,7 @@ void FileManager::try_flush_node_info(FileNodePtr node, const char *source) {
context_->on_file_updated(file_id);
}
if (info->download_callback_) {
// For DownloadManger. For everybody else it is just an empty function call (I hope).
// For DownloadManager. For everybody else it is just an empty function call (I hope).
info->download_callback_->on_progress(file_id);
}
}

View File

@ -6,6 +6,7 @@
//
#pragma once
#include "td/telegram/FileReferenceManager.h"
#include "td/telegram/FileReferenceManager.hpp"
#include "td/telegram/files/FileSourceId.h"
#include "td/telegram/Td.h"

View File

@ -12,7 +12,6 @@
#include "td/utils/FlatHashMapLinear.h"
#include <cstddef>
#include <cstdlib>
#include <functional>
#include <initializer_list>
#include <iterator>

View File

@ -11,6 +11,7 @@
#include "td/utils/Random.h"
#include <cstddef>
#include <cstdlib>
#include <functional>
#include <initializer_list>
#include <iterator>