DownloadManager fixes and improvements.

This commit is contained in:
levlam 2022-02-27 18:23:06 +03:00
parent b2ae91ef48
commit a3cef00551
9 changed files with 82 additions and 66 deletions

View File

@ -6,17 +6,18 @@
// //
#include "td/telegram/DownloadManager.h" #include "td/telegram/DownloadManager.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/Hints.h"
#include "td/utils/tl_helpers.h"
#include "td/actor/MultiPromise.h"
#include "td/telegram/FileReferenceManager.h" #include "td/telegram/FileReferenceManager.h"
#include "td/telegram/files/FileSourceId.hpp" #include "td/telegram/files/FileSourceId.hpp"
#include "td/telegram/Global.h" #include "td/telegram/Global.h"
#include "td/telegram/logevent/LogEvent.h" #include "td/telegram/logevent/LogEvent.h"
#include "td/telegram/TdDb.h" #include "td/telegram/TdDb.h"
#include "td/actor/MultiPromise.h"
#include "td/utils/algorithm.h" #include "td/utils/algorithm.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/Hints.h"
#include "td/utils/tl_helpers.h"
namespace td { namespace td {
@ -172,7 +173,7 @@ class DownloadManagerImpl final : public DownloadManager {
} }
void search(string query, bool only_active, bool only_completed, string offset, int32 limit, void search(string query, bool only_active, bool only_completed, string offset, int32 limit,
Promise<FoundFileDownloads> promise) final { Promise<td_api::object_ptr<td_api::foundFileDownloads>> promise) final {
TRY_STATUS_PROMISE(promise, check_is_active()); TRY_STATUS_PROMISE(promise, check_is_active());
int64 offset_int64 = std::numeric_limits<int64>::max(); int64 offset_int64 = std::numeric_limits<int64>::max();
if (!offset.empty()) { if (!offset.empty()) {
@ -198,28 +199,24 @@ class DownloadManagerImpl final : public DownloadManager {
return false; return false;
}); });
std::sort(ids.begin(), ids.end(), std::greater<>()); std::sort(ids.begin(), ids.end(), std::greater<>());
FoundFileDownloads found; int32 total_count = narrow_cast<int32>(ids.size());
found.total_count = narrow_cast<int32>(ids.size());
remove_if(ids, [offset_int64](auto id) { return id < offset_int64; }); remove_if(ids, [offset_int64](auto id) { return id < offset_int64; });
if (static_cast<int32>(ids.size()) > limit) { if (static_cast<int32>(ids.size()) > limit) {
ids.resize(max(0, limit)); ids.resize(max(0, limit));
} }
int64 last_id = 0; int64 last_id = 0;
found.file_downloads = transform(ids, [&](auto id) { auto file_downloads = transform(ids, [&](auto id) {
auto it = files_.find(id); last_id = id;
FileInfo &file_info = it->second; FileInfo &file_info = files_[id];
FileDownload res; return callback_->get_file_download_object(file_info.file_id, file_info.file_source_id, file_info.created_at,
res.is_paused = file_info.is_paused; file_info.completed_at, file_info.is_paused);
res.file_source_id = file_info.file_source_id;
res.file_id = it->second.file_id;
res.add_date = file_info.created_at;
res.complete_date = file_info.completed_at;
return res;
}); });
td::remove_if(file_downloads, [](const auto &file_download) { return file_download->message_ == nullptr; });
string next_offset;
if (last_id != 0) { if (last_id != 0) {
found.offset = to_string(last_id); next_offset = to_string(last_id);
} }
return promise.set_value(std::move(found)); return promise.set_value(td_api::make_object<td_api::foundFileDownloads>(total_count, Auto(), next_offset));
} }
void update_file_download_state(FileId internal_file_id, int64 download_size, int64 size, bool is_paused) final { void update_file_download_state(FileId internal_file_id, int64 download_size, int64 size, bool is_paused) final {
@ -354,7 +351,7 @@ class DownloadManagerImpl final : public DownloadManager {
send_closure(self, &DownloadManagerImpl::add_file_from_db, res.move_as_ok(), std::move(in_db)); send_closure(self, &DownloadManagerImpl::add_file_from_db, res.move_as_ok(), std::move(in_db));
promise.set_value({}); promise.set_value({});
}; };
send_closure(G()->file_reference_manager(), &FileReferenceManager::get_file_info, file_source_id, send_closure(G()->file_reference_manager(), &FileReferenceManager::get_file_search_info, file_source_id,
std::move(unique_file_id), std::move(new_promise)); std::move(unique_file_id), std::move(new_promise));
} }
} }
@ -401,7 +398,7 @@ class DownloadManagerImpl final : public DownloadManager {
callback_->update_counters(counters_); callback_->update_counters(counters_);
} }
td::Result<const FileInfo *> get_file_info(FileId file_id, FileSourceId file_source_id = {}) { Result<const FileInfo *> get_file_info(FileId file_id, FileSourceId file_source_id = {}) {
auto it = by_file_id_.find(file_id); auto it = by_file_id_.find(file_id);
if (it == by_file_id_.end()) { if (it == by_file_id_.end()) {
return Status::Error(400, "Can't find file"); return Status::Error(400, "Can't find file");
@ -409,7 +406,7 @@ class DownloadManagerImpl final : public DownloadManager {
return get_file_info(it->second, file_source_id); return get_file_info(it->second, file_source_id);
} }
td::Result<const FileInfo *> get_file_info_by_internal(FileId file_id) { Result<const FileInfo *> get_file_info_by_internal(FileId file_id) {
auto it = by_internal_file_id_.find(file_id); auto it = by_internal_file_id_.find(file_id);
if (it == by_internal_file_id_.end()) { if (it == by_internal_file_id_.end()) {
return Status::Error(400, "Can't find file"); return Status::Error(400, "Can't find file");
@ -417,7 +414,7 @@ class DownloadManagerImpl final : public DownloadManager {
return get_file_info(it->second); return get_file_info(it->second);
} }
td::Result<const FileInfo *> get_file_info(int64 download_id, FileSourceId file_source_id = {}) { Result<const FileInfo *> get_file_info(int64 download_id, FileSourceId file_source_id = {}) {
auto it = files_.find(download_id); auto it = files_.find(download_id);
if (it == files_.end()) { if (it == files_.end()) {
return Status::Error(400, "Can't find file"); return Status::Error(400, "Can't find file");
@ -433,6 +430,7 @@ class DownloadManagerImpl final : public DownloadManager {
counters_.downloaded_size -= file_info.downloaded_size; counters_.downloaded_size -= file_info.downloaded_size;
counters_.total_size -= file_info.size; counters_.total_size -= file_info.size;
} }
void register_file_info(FileInfo &file_info) { void register_file_info(FileInfo &file_info) {
active_files_count_ += is_active(file_info); active_files_count_ += is_active(file_info);
counters_.downloaded_size += file_info.downloaded_size; counters_.downloaded_size += file_info.downloaded_size;
@ -444,6 +442,7 @@ class DownloadManagerImpl final : public DownloadManager {
sync_with_db(file_info); sync_with_db(file_info);
update_counters(); update_counters();
} }
template <class F> template <class F>
void with_file_info(const FileInfo &const_file_info, F &&f) { void with_file_info(const FileInfo &const_file_info, F &&f) {
unregister_file_info(const_file_info); unregister_file_info(const_file_info);
@ -451,6 +450,7 @@ class DownloadManagerImpl final : public DownloadManager {
f(file_info); f(file_info);
register_file_info(file_info); register_file_info(file_info);
} }
Status check_is_active() { Status check_is_active() {
if (!callback_) { if (!callback_) {
return Status::Error("TODO: code and message`"); return Status::Error("TODO: code and message`");
@ -463,11 +463,8 @@ unique_ptr<DownloadManager> DownloadManager::create() {
return make_unique<DownloadManagerImpl>(); return make_unique<DownloadManagerImpl>();
} }
tl_object_ptr<td_api::foundFileDownloads> DownloadManager::FoundFileDownloads::to_td_api() const { td_api::object_ptr<td_api::updateFileDownloads> DownloadManager::Counters::get_update_file_downloads_object() const {
return make_tl_object<td_api::foundFileDownloads>(); return td_api::make_object<td_api::updateFileDownloads>(total_size, total_count, downloaded_size);
}
tl_object_ptr<td_api::updateFileDownloads> DownloadManager::Counters::to_td_api() const {
return make_tl_object<td_api::updateFileDownloads>(total_size, total_count, downloaded_size);
} }
template <class StorerT> template <class StorerT>
@ -487,4 +484,5 @@ void DownloadManager::Counters::parse(ParserT &parser) {
td::parse(total_count, parser); td::parse(total_count, parser);
td::parse(downloaded_size, parser); td::parse(downloaded_size, parser);
} }
} // namespace td } // namespace td

View File

@ -22,7 +22,7 @@ namespace td {
class DownloadManager : public Actor { class DownloadManager : public Actor {
public: public:
// creates, but do not stats the actor // creates, but do not stats the actor
static td::unique_ptr<DownloadManager> create(); static unique_ptr<DownloadManager> create();
struct Counters { struct Counters {
int64 total_size{}; int64 total_size{};
@ -34,7 +34,8 @@ class DownloadManager : public Actor {
downloaded_size == other.downloaded_size; downloaded_size == other.downloaded_size;
} }
tl_object_ptr<td_api::updateFileDownloads> to_td_api() const; td_api::object_ptr<td_api::updateFileDownloads> get_update_file_downloads_object() const;
template <class StorerT> template <class StorerT>
void store(StorerT &storer) const; void store(StorerT &storer) const;
@ -42,21 +43,6 @@ class DownloadManager : public Actor {
void parse(ParserT &parser); void parse(ParserT &parser);
}; };
struct FileDownload {
FileId file_id;
FileSourceId file_source_id;
bool is_paused{};
int32 add_date{};
int32 complete_date{};
};
struct FoundFileDownloads {
int32 total_count{};
vector<FileDownload> file_downloads;
string offset;
tl_object_ptr<td_api::foundFileDownloads> to_td_api() const;
};
// Trying to make DownloadManager testable, so all interactions with G() will be hidden is this probably monstrous interface // Trying to make DownloadManager testable, so all interactions with G() will be hidden is this probably monstrous interface
class Callback { class Callback {
public: public:
@ -68,6 +54,10 @@ class DownloadManager : public Actor {
virtual FileId dup_file_id(FileId file_id) = 0; virtual FileId dup_file_id(FileId file_id) = 0;
virtual string get_unique_file_id(FileId file_id) = 0; virtual string get_unique_file_id(FileId file_id) = 0;
virtual td_api::object_ptr<td_api::fileDownload> get_file_download_object(FileId file_id,
FileSourceId file_source_id,
int32 add_date, int32 complete_date,
bool is_paused) = 0;
}; };
// //
@ -85,7 +75,7 @@ class DownloadManager : public Actor {
// Files are always added in is_paused = false state // Files are always added in is_paused = false state
virtual Status add_file(FileId file_id, FileSourceId file_source_id, string search_by, int8 priority) = 0; virtual Status add_file(FileId file_id, FileSourceId file_source_id, string search_by, int8 priority) = 0;
virtual void search(std::string query, bool only_active, bool only_completed, string offset, int32 limit, virtual void search(std::string query, bool only_active, bool only_completed, string offset, int32 limit,
Promise<FoundFileDownloads> promise) = 0; Promise<td_api::object_ptr<td_api::foundFileDownloads>> promise) = 0;
// //
// private interface to handle all kinds of updates // private interface to handle all kinds of updates

View File

@ -394,16 +394,29 @@ void FileReferenceManager::reload_photo(PhotoSizeSource source, Promise<Unit> pr
} }
} }
void FileReferenceManager::get_file_info(FileSourceId file_source_id, string unique_file_id, void FileReferenceManager::get_file_search_info(FileSourceId file_source_id, string unique_file_id,
Promise<FileSearchInfo> promise) { Promise<FileSearchInfo> promise) {
auto index = static_cast<size_t>(file_source_id.get()) - 1; auto index = static_cast<size_t>(file_source_id.get()) - 1;
CHECK(index < file_sources_.size()); CHECK(index < file_sources_.size());
file_sources_[index].visit(overloaded( file_sources_[index].visit(overloaded(
[&](const FileSourceMessage &source) { [&](const FileSourceMessage &source) {
send_closure_later(G()->messages_manager(), &MessagesManager::get_message_file_info, source.full_message_id, send_closure_later(G()->messages_manager(), &MessagesManager::get_message_file_search_info,
std::move(unique_file_id), std::move(promise)); source.full_message_id, std::move(unique_file_id), std::move(promise));
}, },
[&](const auto &source) { promise.set_error(Status::Error(500, "Unsupported file source")); })); [&](const auto &source) { promise.set_error(Status::Error(500, "Unsupported file source")); }));
} }
td_api::object_ptr<td_api::message> FileReferenceManager::get_message_object(FileSourceId file_source_id) const {
auto index = static_cast<size_t>(file_source_id.get()) - 1;
CHECK(index < file_sources_.size());
td_api::object_ptr<td_api::message> result;
file_sources_[index].visit(overloaded(
[&](const FileSourceMessage &source) {
result = G()->td().get_actor_unsafe()->messages_manager_->get_message_object(source.full_message_id,
"FileReferenceManager");
},
[&](const auto &source) { LOG(ERROR) << "Unsupported file source"; }));
return result;
}
} // namespace td } // namespace td

View File

@ -14,6 +14,7 @@
#include "td/telegram/FullMessageId.h" #include "td/telegram/FullMessageId.h"
#include "td/telegram/PhotoSizeSource.h" #include "td/telegram/PhotoSizeSource.h"
#include "td/telegram/SetWithPosition.h" #include "td/telegram/SetWithPosition.h"
#include "td/telegram/td_api.h"
#include "td/telegram/UserId.h" #include "td/telegram/UserId.h"
#include "td/actor/actor.h" #include "td/actor/actor.h"
@ -60,7 +61,9 @@ class FileReferenceManager final : public Actor {
using NodeId = FileId; using NodeId = FileId;
void repair_file_reference(NodeId node_id, Promise<> promise); void repair_file_reference(NodeId node_id, Promise<> promise);
void get_file_info(FileSourceId file_source_id, string unique_file_id, Promise<FileSearchInfo> promise); void get_file_search_info(FileSourceId file_source_id, string unique_file_id, Promise<FileSearchInfo> promise);
td_api::object_ptr<td_api::message> get_message_object(FileSourceId file_source_id) const;
static void reload_photo(PhotoSizeSource source, Promise<Unit> promise); static void reload_photo(PhotoSizeSource source, Promise<Unit> promise);

View File

@ -39816,8 +39816,8 @@ void MessagesManager::add_message_file_to_downloads(FullMessageId full_message_i
promise.set_value(td_->file_manager_->get_file_object(file_id)); promise.set_value(td_->file_manager_->get_file_object(file_id));
} }
void MessagesManager::get_message_file_info(FullMessageId full_message_id, string unique_file_id, void MessagesManager::get_message_file_search_info(FullMessageId full_message_id, string unique_file_id,
Promise<FileSearchInfo> promise) { Promise<FileSearchInfo> promise) {
auto m = get_message_force(full_message_id, "add_message_file_to_downloads"); auto m = get_message_force(full_message_id, "add_message_file_to_downloads");
if (m == nullptr) { if (m == nullptr) {
return promise.set_error(Status::Error(200, "Message not found")); return promise.set_error(Status::Error(200, "Message not found"));

View File

@ -984,7 +984,8 @@ class MessagesManager final : public Actor {
void add_message_file_to_downloads(FullMessageId full_message_id, FileId file_id, int32 priority, void add_message_file_to_downloads(FullMessageId full_message_id, FileId file_id, int32 priority,
Promise<td_api::object_ptr<td_api::file>> promise); Promise<td_api::object_ptr<td_api::file>> promise);
void get_message_file_info(FullMessageId full_message_id, string unique_file_id, Promise<FileSearchInfo> promise); void get_message_file_search_info(FullMessageId full_message_id, string unique_file_id,
Promise<FileSearchInfo> promise);
private: private:
class PendingPtsUpdate { class PendingPtsUpdate {

View File

@ -3978,7 +3978,7 @@ void Td::init_managers() {
: parent_(std::move(parent)), download_manager_(download_manager) { : parent_(std::move(parent)), download_manager_(download_manager) {
} }
void update_counters(DownloadManager::Counters counters) final { void update_counters(DownloadManager::Counters counters) final {
send_closure(G()->td(), &Td::send_update, counters.to_td_api()); send_closure(G()->td(), &Td::send_update, counters.get_update_file_downloads_object());
} }
void start_file(FileId file_id, int8 priority) final { void start_file(FileId file_id, int8 priority) final {
send_closure(G()->file_manager(), &FileManager::download, file_id, make_download_file_callback(), priority, -1, send_closure(G()->file_manager(), &FileManager::download, file_id, make_download_file_callback(), priority, -1,
@ -3992,10 +3992,20 @@ void Td::init_managers() {
G()->file_manager(), &FileManager::delete_file, file_id, [](Result<Unit>) {}, "download manager callback"); G()->file_manager(), &FileManager::delete_file, file_id, [](Result<Unit>) {}, "download manager callback");
} }
FileId dup_file_id(FileId file_id) final { FileId dup_file_id(FileId file_id) final {
return G()->file_manager().get_actor_unsafe()->dup_file_id(file_id); auto td = G()->td().get_actor_unsafe();
return td->file_manager_->dup_file_id(file_id);
} }
string get_unique_file_id(FileId file_id) final { string get_unique_file_id(FileId file_id) final {
return G()->file_manager().get_actor_unsafe()->get_file_view(file_id).get_unique_file_id(); auto td = G()->td().get_actor_unsafe();
return td->file_manager_->get_file_view(file_id).get_unique_file_id();
}
td_api::object_ptr<td_api::fileDownload> get_file_download_object(FileId file_id, FileSourceId file_source_id,
int32 add_date, int32 complete_date,
bool is_paused) {
auto td = G()->td().get_actor_unsafe();
return td_api::make_object<td_api::fileDownload>(td->file_manager_->get_file_view(file_id).file_id().get(),
td->file_reference_manager_->get_message_object(file_source_id),
add_date, complete_date, is_paused);
} }
private: private:
@ -6607,12 +6617,8 @@ void Td::on_request(uint64 id, td_api::searchFileDownloads &request) {
CLEAN_INPUT_STRING(request.query_); CLEAN_INPUT_STRING(request.query_);
CLEAN_INPUT_STRING(request.offset_); CLEAN_INPUT_STRING(request.offset_);
CREATE_REQUEST_PROMISE(); CREATE_REQUEST_PROMISE();
auto wrapped_promise = [promise = std::move(promise)](Result<DownloadManager::FoundFileDownloads> result) mutable {
TRY_RESULT_PROMISE(promise, found, std::move(result));
promise.set_value(found.to_td_api());
};
send_closure(download_manager_actor_, &DownloadManager::search, std::move(request.query_), request.only_active_, send_closure(download_manager_actor_, &DownloadManager::search, std::move(request.query_), request.only_active_,
request.only_completed_, std::move(request.offset_), request.limit_, std::move(wrapped_promise)); request.only_completed_, std::move(request.offset_), request.limit_, std::move(promise));
} }
void Td::on_request(uint64 id, td_api::getMessageFileType &request) { void Td::on_request(uint64 id, td_api::getMessageFileType &request) {

View File

@ -1,15 +1,19 @@
// //
// Created by Arseny on 26.02.2022. // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
// //
#pragma once #pragma once
#include "td/telegram/FileReferenceManager.hpp" #include "td/telegram/FileReferenceManager.hpp"
#include "td/telegram/files/FileSourceId.h" #include "td/telegram/files/FileSourceId.h"
#include "td/telegram/Td.h" #include "td/telegram/Td.h"
namespace td { namespace td {
template <class StorerT> template <class StorerT>
void store(const FileSourceId &file_source_id, StorerT &storer) { void store(FileSourceId file_source_id, StorerT &storer) {
Td *td = storer.context()->td().get_actor_unsafe(); Td *td = storer.context()->td().get_actor_unsafe();
td->file_reference_manager_->store_file_source(file_source_id, storer); td->file_reference_manager_->store_file_source(file_source_id, storer);
} }
@ -19,4 +23,5 @@ void parse(FileSourceId &file_source_id, ParserT &parser) {
Td *td = parser.context()->td().get_actor_unsafe(); Td *td = parser.context()->td().get_actor_unsafe();
file_source_id = td->file_reference_manager_->parse_file_source(td, parser); file_source_id = td->file_reference_manager_->parse_file_source(td, parser);
} }
} // namespace td } // namespace td