From b2ae91ef48b29aae43d876a813dbcffa2af7554b Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Sat, 26 Feb 2022 21:54:01 +0100 Subject: [PATCH] DownloadManager: WIP --- CMakeLists.txt | 1 - td/telegram/DownloadManager.cpp | 238 +++++++++++++++++++------------- td/telegram/DownloadManager.h | 4 +- 3 files changed, 147 insertions(+), 96 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index cc5e90b06..c0994596b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -507,7 +507,6 @@ set(TDLIB_SOURCE td/telegram/Document.h td/telegram/DocumentsManager.h td/telegram/DownloadManager.h - td/telegram/DownloadsDb.h td/telegram/DraftMessage.h td/telegram/EncryptedFile.h td/telegram/FileReferenceManager.h diff --git a/td/telegram/DownloadManager.cpp b/td/telegram/DownloadManager.cpp index 873b91d40..a90097100 100644 --- a/td/telegram/DownloadManager.cpp +++ b/td/telegram/DownloadManager.cpp @@ -11,25 +11,25 @@ #include "td/utils/tl_helpers.h" #include "td/actor/MultiPromise.h" +#include "td/telegram/FileReferenceManager.h" #include "td/telegram/files/FileSourceId.hpp" -#include "td/telegram/DownloadsDb.h" #include "td/telegram/Global.h" #include "td/telegram/logevent/LogEvent.h" #include "td/telegram/TdDb.h" #include "td/utils/algorithm.h" -#include "td/utils/FlatHashMap.h" - namespace td { -//TODO: replace LOG(ERROR) with something else struct FileDownloadInDb { int64 download_id{}; string unique_file_id; FileSourceId file_source_id; string search_text; - bool is_paused{}; int priority{}; + int created_at{}; + int completed_at{}; + + bool is_paused{}; template void store(StorerT &storer) const { @@ -40,6 +40,8 @@ struct FileDownloadInDb { td::store(unique_file_id, storer); td::store(file_source_id, storer); td::store(priority, storer); + td::store(created_at, storer); + td::store(completed_at, storer); } template @@ -51,6 +53,8 @@ struct FileDownloadInDb { td::parse(unique_file_id, parser); td::parse(file_source_id, parser); td::parse(priority, parser); + td::parse(created_at, parser); + td::parse(completed_at, parser); } }; @@ -62,24 +66,19 @@ class DownloadManagerImpl final : public DownloadManager { } Status toggle_is_paused(FileId file_id, bool is_paused) final { - if (!callback_) { - return Status::OK(); - } + TRY_STATUS(check_is_active()); - auto fit = by_file_id_.find(file_id); - if (fit == by_file_id_.end()) { - return Status::Error(400, "Can't find file"); - } - auto it = files_.find(fit->second); - CHECK(it != files_.end()); - - auto &file_info = it->second; + TRY_RESULT(file_info_ptr, get_file_info(file_id)); + auto &file_info = *file_info_ptr; if (!is_active(file_info)) { return Status::Error(400, "File is already downloaded"); } if (is_paused != file_info.is_paused) { - file_info.is_paused = is_paused; + with_file_info(file_info, [&](auto &file_info) { + file_info.is_paused = is_paused; + file_info.need_save_to_db = true; + }); if (is_paused) { callback_->pause_file(file_info.internal_file_id); } else { @@ -87,15 +86,11 @@ class DownloadManagerImpl final : public DownloadManager { } } - sync_with_db(file_info); - return Status::OK(); } Status toggle_all_is_paused(bool is_paused) final { - if (!callback_) { - return Status::Error("TODO: code and message`"); - } + TRY_STATUS(check_is_active()); for (auto &it : files_) { toggle_is_paused(it.second.file_id, is_paused); @@ -105,41 +100,34 @@ class DownloadManagerImpl final : public DownloadManager { } Status remove_file(FileId file_id, FileSourceId file_source_id, bool delete_from_cache) final { - if (!callback_) { - return Status::Error("TODO: code and message`"); + TRY_STATUS(check_is_active()); + TRY_RESULT(file_info_ptr, get_file_info(file_id, file_source_id)); + auto &file_info = *file_info_ptr; + if (!file_info.is_paused) { + callback_->pause_file(file_info.internal_file_id); } - auto fit = by_file_id_.find(file_id); - if (fit == by_file_id_.end()) { - return Status::OK(); - } - auto it = files_.find(fit->second); - if (it != files_.end() && (!file_source_id.is_valid() || file_source_id == it->second.file_source_id)) { - auto &file_info = it->second; - if (!file_info.is_paused) { - callback_->pause_file(file_info.internal_file_id); - } - if (delete_from_cache) { - callback_->delete_file(file_info.internal_file_id); - } - by_internal_file_id_.erase(file_info.internal_file_id); - by_file_id_.erase(file_info.file_id); - hints_.remove(file_info.download_id); - counters_.total_count--; - counters_.downloaded_size -= file_info.downloaded_size; - counters_.total_size -= file_info.size; - - remove_from_db(file_info); - files_.erase(it); + if (delete_from_cache) { + callback_->delete_file(file_info.internal_file_id); } + by_internal_file_id_.erase(file_info.internal_file_id); + by_file_id_.erase(file_info.file_id); + hints_.remove(file_info.download_id); + unregister_file_info(file_info); + remove_from_db(file_info); + files_.erase(file_info.download_id); + return Status::OK(); + } + Status change_search_text(FileId file_id, FileSourceId file_source_id, string search_by) final { + TRY_STATUS(check_is_active()); + TRY_RESULT(file_info_ptr, get_file_info(file_id, file_source_id)); + auto &file_info = *file_info_ptr; + hints_.add(file_info.download_id, search_by); return Status::OK(); } Status remove_all_files(bool only_active, bool only_completed, bool delete_from_cache) final { - if (!callback_) { - return Status::Error("TODO: code and message`"); - } - //TODO(later): index? + TRY_STATUS(check_is_active()); std::vector to_remove; for (auto &it : files_) { FileInfo &file_info = it.second; @@ -158,9 +146,7 @@ class DownloadManagerImpl final : public DownloadManager { } Status add_file(FileId file_id, FileSourceId file_source_id, string search_by, int8 priority) final { - if (!callback_) { - return Status::Error("TODO: code and message`"); - } + TRY_STATUS(check_is_active()); // TODO: maybe we should ignore this query in some cases remove_file(file_id, {}, false); @@ -172,6 +158,7 @@ class DownloadManagerImpl final : public DownloadManager { file_info.file_source_id = file_source_id; file_info.is_paused = false; file_info.priority = priority; + file_info.created_at = static_cast(G()->server_time()); by_internal_file_id_[file_info.internal_file_id] = file_info.download_id; by_file_id_[file_info.file_id] = file_info.download_id; hints_.add(file_info.download_id, search_by); @@ -179,26 +166,29 @@ class DownloadManagerImpl final : public DownloadManager { auto download_id = file_info.download_id; file_info.need_save_to_db = true; auto it = files_.emplace(download_id, file_info).first; + register_file_info(it->second); callback_->start_file(it->second.internal_file_id, it->second.priority); - sync_with_db(it->second); - counters_.total_count++; - update_counters(); return Status::OK(); } void search(string query, bool only_active, bool only_completed, string offset, int32 limit, Promise promise) final { - if (!callback_) { - return promise.set_error(Status::Error("TODO: code and message`")); + TRY_STATUS_PROMISE(promise, check_is_active()); + int64 offset_int64 = std::numeric_limits::max(); + if (!offset.empty()) { + auto r_offset = to_integer_safe(offset); + if (r_offset.is_error()) { + return promise.set_error(Status::Error(400, "Invalid offset")); + } + offset_int64 = r_offset.move_as_ok(); } - TRY_RESULT_PROMISE(promise, offset_int64, to_integer_safe(offset)); - auto ids = hints_.search(query, 200, true).second; + auto ids = query.empty() ? transform(files_, [](auto &it) { return it.first; }) : hints_.search(query, 200).second; remove_if(ids, [&](auto download_id) { - auto it = files_.find(download_id); - if (it == files_.end()) { + auto r = get_file_info(download_id); + if (r.is_error()) { return true; } - auto &file_info = it->second; + auto &file_info = *r.ok(); if (only_active && !is_active(file_info)) { return true; } @@ -222,6 +212,8 @@ class DownloadManagerImpl final : public DownloadManager { res.is_paused = file_info.is_paused; res.file_source_id = file_info.file_source_id; res.file_id = it->second.file_id; + res.add_date = file_info.created_at; + res.complete_date = file_info.completed_at; return res; }); if (last_id != 0) { @@ -234,24 +226,20 @@ class DownloadManagerImpl final : public DownloadManager { if (!callback_) { return; } - - auto by_internal_file_id_it = by_internal_file_id_.find(internal_file_id); - if (by_internal_file_id_it == by_internal_file_id_.end()) { + auto r_file_info_ptr = get_file_info_by_internal(internal_file_id); + if (r_file_info_ptr.is_error()) { return; } - auto it = files_.find(by_internal_file_id_it->second); - CHECK(it != files_.end()); - auto &file_info = it->second; + auto &file_info = *r_file_info_ptr.ok(); - counters_.downloaded_size -= file_info.downloaded_size; - counters_.total_size -= file_info.size; - file_info.size = size; - file_info.downloaded_size = download_size; - counters_.downloaded_size += file_info.downloaded_size; - counters_.total_size += file_info.size; - file_info.is_paused = is_paused; - - update_counters(); + with_file_info(file_info, [&](FileInfo &file_info) { + file_info.size = size; + file_info.downloaded_size = download_size; + if (file_info.is_paused != is_paused) { + file_info.is_paused = is_paused; + file_info.need_save_to_db = true; + } + }); } void update_file_deleted(FileId internal_file_id) final { @@ -259,13 +247,12 @@ class DownloadManagerImpl final : public DownloadManager { return; } - auto fit = by_internal_file_id_.find(internal_file_id); - if (fit == by_internal_file_id_.end()) { + auto r_file_info_ptr = get_file_info_by_internal(internal_file_id); + if (r_file_info_ptr.is_error()) { return; } - auto it = files_.find(fit->second); - CHECK(it != files_.end()); - remove_file(it->second.file_id, {}, false); + auto &file_info = *r_file_info_ptr.ok(); + remove_file(file_info.file_id, {}, false); } private: @@ -279,8 +266,10 @@ class DownloadManagerImpl final : public DownloadManager { FileSourceId file_source_id{}; int64 size{}; int64 downloaded_size{}; + int32 created_at{}; + int32 completed_at{}; - bool need_save_to_db{false}; + mutable bool need_save_to_db{false}; }; FlatHashMap by_file_id_; @@ -307,7 +296,7 @@ class DownloadManagerImpl final : public DownloadManager { static std::string pmc_key(const FileInfo &file_info) { return PSTRING() << "dlds" << file_info.download_id; } - void sync_with_db(FileInfo &file_info) { + void sync_with_db(const FileInfo &file_info) { if (!file_info.need_save_to_db) { return; } @@ -354,11 +343,10 @@ class DownloadManagerImpl final : public DownloadManager { log_event_parse(in_db, value).ensure(); in_db.download_id = to_integer_safe(key.substr(4)).move_as_ok(); auto promise = mtps.get_promise(); - // TODO: load data from MessagesManager auto unique_file_id = std::move(in_db.unique_file_id); auto file_source_id = in_db.file_source_id; auto new_promise = [self = actor_id(this), in_db = std::move(in_db), - promise = std::move(promise)](Result res) mutable { + promise = std::move(promise)](Result res) mutable { if (res.is_error()) { promise.set_value({}); return; @@ -366,25 +354,28 @@ class DownloadManagerImpl final : public DownloadManager { send_closure(self, &DownloadManagerImpl::add_file_from_db, res.move_as_ok(), std::move(in_db)); promise.set_value({}); }; - // auto send_closure(G()->messages_manager(), &MessagesManager::todo, unique_file_id, file_source_id, - // std::move(new_promise)); + send_closure(G()->file_reference_manager(), &FileReferenceManager::get_file_info, file_source_id, + std::move(unique_file_id), std::move(new_promise)); } } - void add_file_from_db(FileId file_id, FileDownloadInDb in_db) { + void add_file_from_db(FileSearchInfo file_search_info, FileDownloadInDb in_db) { FileInfo file_info; file_info.download_id = in_db.download_id; - file_info.file_id = file_id; - file_info.internal_file_id = callback_->dup_file_id(file_id); + file_info.file_id = file_search_info.file_id; + file_info.internal_file_id = callback_->dup_file_id(file_info.file_id); file_info.file_source_id = in_db.file_source_id; file_info.is_paused = false; file_info.priority = narrow_cast(in_db.priority); + file_info.completed_at = in_db.completed_at; + file_info.created_at = in_db.created_at; by_internal_file_id_[file_info.internal_file_id] = file_info.download_id; by_file_id_[file_info.file_id] = file_info.download_id; - //TODO: hints_.add(file_info.download_id, search_by); + hints_.add(file_info.download_id, file_search_info.search_text); auto download_id = file_info.download_id; auto it = files_.emplace(download_id, file_info).first; + register_file_info(it->second); callback_->start_file(it->second.internal_file_id, it->second.priority); } @@ -409,6 +400,63 @@ class DownloadManagerImpl final : public DownloadManager { sent_counters_ = counters_; callback_->update_counters(counters_); } + + td::Result get_file_info(FileId file_id, FileSourceId file_source_id = {}) { + auto it = by_file_id_.find(file_id); + if (it == by_file_id_.end()) { + return Status::Error(400, "Can't find file"); + } + return get_file_info(it->second, file_source_id); + } + + td::Result get_file_info_by_internal(FileId file_id) { + auto it = by_internal_file_id_.find(file_id); + if (it == by_internal_file_id_.end()) { + return Status::Error(400, "Can't find file"); + } + return get_file_info(it->second); + } + + td::Result get_file_info(int64 download_id, FileSourceId file_source_id = {}) { + auto it = files_.find(download_id); + if (it == files_.end()) { + return Status::Error(400, "Can't find file"); + } + if (file_source_id.is_valid() && file_source_id != it->second.file_source_id) { + return Status::Error(400, "Can't find file with such source"); + } + return &it->second; + } + + void unregister_file_info(const FileInfo &file_info) { + active_files_count_ -= is_active(file_info); + counters_.downloaded_size -= file_info.downloaded_size; + counters_.total_size -= file_info.size; + } + void register_file_info(FileInfo &file_info) { + active_files_count_ += is_active(file_info); + counters_.downloaded_size += file_info.downloaded_size; + counters_.total_size += file_info.size; + if (!is_active(file_info) && file_info.completed_at == 0) { + file_info.completed_at = static_cast(G()->server_time()); + file_info.need_save_to_db = true; + } + sync_with_db(file_info); + update_counters(); + } + template + void with_file_info(const FileInfo &const_file_info, F &&f) { + unregister_file_info(const_file_info); + auto &file_info = const_cast(const_file_info); + f(file_info); + register_file_info(file_info); + } + Status check_is_active() { + if (!callback_) { + return Status::Error("TODO: code and message`"); + } + return Status::OK(); + } }; unique_ptr DownloadManager::create() { @@ -421,6 +469,7 @@ tl_object_ptr DownloadManager::FoundFileDownloads::t tl_object_ptr DownloadManager::Counters::to_td_api() const { return make_tl_object(total_size, total_count, downloaded_size); } + template void DownloadManager::Counters::store(StorerT &storer) const { BEGIN_STORE_FLAGS(); @@ -429,11 +478,12 @@ void DownloadManager::Counters::store(StorerT &storer) const { td::store(total_count, storer); td::store(downloaded_size, storer); } + template void DownloadManager::Counters::parse(ParserT &parser) { BEGIN_PARSE_FLAGS(); END_PARSE_FLAGS(); - + td::parse(total_size, parser); td::parse(total_count, parser); td::parse(downloaded_size, parser); } diff --git a/td/telegram/DownloadManager.h b/td/telegram/DownloadManager.h index 6897c16c2..76540dcba 100644 --- a/td/telegram/DownloadManager.h +++ b/td/telegram/DownloadManager.h @@ -30,7 +30,8 @@ class DownloadManager : public Actor { int64 downloaded_size{}; bool operator==(const Counters &other) const { - return total_size == other.total_size && total_count == other.total_count && downloaded_size == other.downloaded_size; + return total_size == other.total_size && total_count == other.total_count && + downloaded_size == other.downloaded_size; } tl_object_ptr to_td_api() const; @@ -79,6 +80,7 @@ class DownloadManager : public Actor { virtual Status toggle_is_paused(FileId, bool is_paused) = 0; virtual Status toggle_all_is_paused(bool is_paused) = 0; virtual Status remove_file(FileId file_id, FileSourceId file_source_id, bool delete_from_cache) = 0; + virtual Status change_search_text(FileId file_id, FileSourceId file_source_id, string search_by) = 0; virtual Status remove_all_files(bool only_active, bool only_completed, bool delete_from_cache) = 0; // Files are always added in is_paused = false state virtual Status add_file(FileId file_id, FileSourceId file_source_id, string search_by, int8 priority) = 0;