diff --git a/CMakeLists.txt b/CMakeLists.txt index 71668a6e2..cc5e90b06 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -314,7 +314,6 @@ set(TDLIB_SOURCE td/telegram/Document.cpp td/telegram/DocumentsManager.cpp td/telegram/DownloadManager.cpp - td/telegram/DownloadsDb.cpp td/telegram/DraftMessage.cpp td/telegram/FileReferenceManager.cpp td/telegram/files/FileBitmask.cpp @@ -681,6 +680,7 @@ set(TDLIB_SOURCE td/telegram/files/FileId.hpp td/telegram/files/FileLocation.hpp td/telegram/files/FileManager.hpp + td/telegram/files/FileSourceId.hpp td/telegram/Game.hpp td/telegram/InputMessageText.hpp td/telegram/MessageEntity.hpp diff --git a/td/telegram/DownloadManager.cpp b/td/telegram/DownloadManager.cpp index 5cfe3b2fc..873b91d40 100644 --- a/td/telegram/DownloadManager.cpp +++ b/td/telegram/DownloadManager.cpp @@ -6,14 +6,54 @@ // #include "td/telegram/DownloadManager.h" +#include "td/utils/FlatHashMap.h" +#include "td/utils/Hints.h" +#include "td/utils/tl_helpers.h" + +#include "td/actor/MultiPromise.h" +#include "td/telegram/files/FileSourceId.hpp" #include "td/telegram/DownloadsDb.h" #include "td/telegram/Global.h" +#include "td/telegram/logevent/LogEvent.h" #include "td/telegram/TdDb.h" +#include "td/utils/algorithm.h" #include "td/utils/FlatHashMap.h" namespace td { +//TODO: replace LOG(ERROR) with something else +struct FileDownloadInDb { + int64 download_id{}; + string unique_file_id; + FileSourceId file_source_id; + string search_text; + bool is_paused{}; + int priority{}; + + template + void store(StorerT &storer) const { + BEGIN_STORE_FLAGS(); + STORE_FLAG(is_paused); + END_STORE_FLAGS(); + td::store(download_id, storer); + td::store(unique_file_id, storer); + td::store(file_source_id, storer); + td::store(priority, storer); + } + + template + void parse(ParserT &parser) { + BEGIN_PARSE_FLAGS(); + PARSE_FLAG(is_paused); + END_PARSE_FLAGS(); + td::parse(download_id, parser); + td::parse(unique_file_id, parser); + td::parse(file_source_id, parser); + td::parse(priority, parser); + } +}; + class DownloadManagerImpl final : public DownloadManager { public: void set_callback(unique_ptr callback) final { @@ -25,12 +65,19 @@ class DownloadManagerImpl final : public DownloadManager { if (!callback_) { return Status::OK(); } - auto it = active_files_.find(file_id); - if (it == active_files_.end()) { + + auto fit = by_file_id_.find(file_id); + if (fit == by_file_id_.end()) { return Status::Error(400, "Can't find file"); } + auto it = files_.find(fit->second); + CHECK(it != files_.end()); auto &file_info = it->second; + if (!is_active(file_info)) { + return Status::Error(400, "File is already downloaded"); + } + if (is_paused != file_info.is_paused) { file_info.is_paused = is_paused; if (is_paused) { @@ -40,7 +87,7 @@ class DownloadManagerImpl final : public DownloadManager { } } - // TODO: update db + sync_with_db(file_info); return Status::OK(); } @@ -49,11 +96,11 @@ class DownloadManagerImpl final : public DownloadManager { if (!callback_) { return Status::Error("TODO: code and message`"); } - for (auto &it : active_files_) { - toggle_is_paused(it.first, is_paused); + + for (auto &it : files_) { + toggle_is_paused(it.second.file_id, is_paused); } - // TODO: update db return Status::OK(); } @@ -61,8 +108,12 @@ class DownloadManagerImpl final : public DownloadManager { if (!callback_) { return Status::Error("TODO: code and message`"); } - auto it = active_files_.find(file_id); - if (it != active_files_.end() && (!file_source_id.is_valid() || file_source_id == it->second.file_source_id)) { + auto fit = by_file_id_.find(file_id); + if (fit == by_file_id_.end()) { + return Status::OK(); + } + auto it = files_.find(fit->second); + if (it != files_.end() && (!file_source_id.is_valid() || file_source_id == it->second.file_source_id)) { auto &file_info = it->second; if (!file_info.is_paused) { callback_->pause_file(file_info.internal_file_id); @@ -71,10 +122,16 @@ class DownloadManagerImpl final : public DownloadManager { callback_->delete_file(file_info.internal_file_id); } by_internal_file_id_.erase(file_info.internal_file_id); + by_file_id_.erase(file_info.file_id); + hints_.remove(file_info.download_id); + counters_.total_count--; + counters_.downloaded_size -= file_info.downloaded_size; + counters_.total_size -= file_info.size; - active_files_.erase(it); + remove_from_db(file_info); + files_.erase(it); } - // TODO: remove from db + return Status::OK(); } @@ -82,21 +139,21 @@ class DownloadManagerImpl final : public DownloadManager { if (!callback_) { return Status::Error("TODO: code and message`"); } - if (!only_completed) { - for (auto &it : active_files_) { - FileInfo &file_info = it.second; - if (!file_info.is_paused) { - callback_->pause_file(file_info.internal_file_id); - } - if (delete_from_cache) { - callback_->delete_file(file_info.internal_file_id); - } + //TODO(later): index? + std::vector to_remove; + for (auto &it : files_) { + FileInfo &file_info = it.second; + if (only_active && !is_active(file_info)) { + continue; } - active_files_.clear(); + if (only_completed && is_active(file_info)) { + continue; + } + to_remove.push_back(file_info.file_id); + } + for (auto file_id : to_remove) { + remove_file(file_id, {}, delete_from_cache); } - - // TODO: remove from db. should respect only_active - // TODO: if delete_from_cache, should iterate all files in db return Status::OK(); } @@ -104,24 +161,29 @@ class DownloadManagerImpl final : public DownloadManager { if (!callback_) { return Status::Error("TODO: code and message`"); } + + // TODO: maybe we should ignore this query in some cases + remove_file(file_id, {}, false); + FileInfo file_info; + file_info.download_id = next_download_id(); + file_info.file_id = file_id; file_info.internal_file_id = callback_->dup_file_id(file_id); file_info.file_source_id = file_source_id; file_info.is_paused = false; file_info.priority = priority; - by_internal_file_id_[file_info.internal_file_id] = file_id; + by_internal_file_id_[file_info.internal_file_id] = file_info.download_id; + by_file_id_[file_info.file_id] = file_info.download_id; + hints_.add(file_info.download_id, search_by); - if (active_files_.count(file_id) == 0) { - counters_.total_count++; - callback_->update_counters(counters_); - } - active_files_[file_id] = file_info; - callback_->start_file(file_info.internal_file_id, file_info.priority); - - G()->td_db()->get_downloads_db_async()->add_download( - DownloadsDbDownload{callback_->get_unique_file_id(file_id), - callback_->get_file_source_serialized(file_source_id), search_by, 0, priority}, - [](Result) {}); + auto download_id = file_info.download_id; + file_info.need_save_to_db = true; + auto it = files_.emplace(download_id, file_info).first; + callback_->start_file(it->second.internal_file_id, it->second.priority); + sync_with_db(it->second); + counters_.total_count++; + update_counters(); + return Status::OK(); } void search(string query, bool only_active, bool only_completed, string offset, int32 limit, @@ -130,10 +192,42 @@ class DownloadManagerImpl final : public DownloadManager { return promise.set_error(Status::Error("TODO: code and message`")); } TRY_RESULT_PROMISE(promise, offset_int64, to_integer_safe(offset)); - // TODO: only active, only completed - G()->td_db()->get_downloads_db_async()->get_downloads_fts(DownloadsDbFtsQuery{query, offset_int64, limit}, - [](Result) {}); - return promise.set_value({}); + auto ids = hints_.search(query, 200, true).second; + remove_if(ids, [&](auto download_id) { + auto it = files_.find(download_id); + if (it == files_.end()) { + return true; + } + auto &file_info = it->second; + if (only_active && !is_active(file_info)) { + return true; + } + if (only_completed && is_active(file_info)) { + return true; + } + return false; + }); + std::sort(ids.begin(), ids.end(), std::greater<>()); + FoundFileDownloads found; + found.total_count = narrow_cast(ids.size()); + remove_if(ids, [offset_int64](auto id) { return id < offset_int64; }); + if (static_cast(ids.size()) > limit) { + ids.resize(max(0, limit)); + } + int64 last_id = 0; + found.file_downloads = transform(ids, [&](auto id) { + auto it = files_.find(id); + FileInfo &file_info = it->second; + FileDownload res; + res.is_paused = file_info.is_paused; + res.file_source_id = file_info.file_source_id; + res.file_id = it->second.file_id; + return res; + }); + if (last_id != 0) { + found.offset = to_string(last_id); + } + return promise.set_value(std::move(found)); } void update_file_download_state(FileId internal_file_id, int64 download_size, int64 size, bool is_paused) final { @@ -145,9 +239,10 @@ class DownloadManagerImpl final : public DownloadManager { if (by_internal_file_id_it == by_internal_file_id_.end()) { return; } - auto it = active_files_.find(by_internal_file_id_it->second); - CHECK(it != active_files_.end()); + auto it = files_.find(by_internal_file_id_it->second); + CHECK(it != files_.end()); auto &file_info = it->second; + counters_.downloaded_size -= file_info.downloaded_size; counters_.total_size -= file_info.size; file_info.size = size; @@ -156,15 +251,7 @@ class DownloadManagerImpl final : public DownloadManager { counters_.total_size += file_info.size; file_info.is_paused = is_paused; - if (download_size == size) { - active_files_.erase(it); - by_internal_file_id_.erase(by_internal_file_id_it); - - if (active_files_.empty()) { - counters_ = {}; - } - } - callback_->update_counters(counters_); + update_counters(); } void update_file_deleted(FileId internal_file_id) final { @@ -172,43 +259,156 @@ class DownloadManagerImpl final : public DownloadManager { return; } - auto it = by_internal_file_id_.find(internal_file_id); - if (it == by_internal_file_id_.end()) { + auto fit = by_internal_file_id_.find(internal_file_id); + if (fit == by_internal_file_id_.end()) { return; } - remove_file(it->second, {}, false); + auto it = files_.find(fit->second); + CHECK(it != files_.end()); + remove_file(it->second.file_id, {}, false); } private: unique_ptr callback_; struct FileInfo { + int64 download_id{}; int8 priority; bool is_paused{}; + FileId file_id{}; FileId internal_file_id{}; FileSourceId file_source_id{}; - int64 size{}; int64 downloaded_size{}; + + bool need_save_to_db{false}; }; - FlatHashMap active_files_; - FlatHashMap by_internal_file_id_; + + FlatHashMap by_file_id_; + FlatHashMap by_internal_file_id_; + FlatHashMap files_; + size_t active_files_count_{0}; + Hints hints_; Counters counters_; + Counters sent_counters_; + bool is_synchonized_{false}; + bool is_started_{false}; + int64 max_download_id_{0}; + + int64 next_download_id() { + auto res = ++max_download_id_; + G()->td_db()->get_binlog_pmc()->set("dlds_max_id", to_string(res)); + return res; + } + + bool is_active(const FileInfo &file_info) const { + return file_info.size == 0 || file_info.downloaded_size != file_info.size; + } + static std::string pmc_key(const FileInfo &file_info) { + return PSTRING() << "dlds" << file_info.download_id; + } + void sync_with_db(FileInfo &file_info) { + if (!file_info.need_save_to_db) { + return; + } + file_info.need_save_to_db = false; + FileDownloadInDb to_save; + to_save.download_id = file_info.download_id; + to_save.file_source_id = file_info.file_source_id; + to_save.is_paused = file_info.is_paused; + to_save.priority = file_info.priority; + to_save.unique_file_id = callback_->get_unique_file_id(file_info.file_id); + G()->td_db()->get_binlog_pmc()->set(pmc_key(file_info), log_event_store(to_save).as_slice().str()); + } + void remove_from_db(const FileInfo &file_info) { + G()->td_db()->get_binlog_pmc()->erase(pmc_key(file_info)); + } + + void on_synchronized(Result) { + LOG(ERROR) << "DownloadManager: synchornized"; + is_synchonized_ = true; + update_counters(); + } + + void try_start() { + if (is_started_) { + return; + } + is_started_ = true; + auto serialized_counter = G()->td_db()->get_binlog_pmc()->get("dlds_counter"); + Counters counters_from_db; + if (!serialized_counter.empty()) { + log_event_parse(counters_from_db, serialized_counter).ensure(); + } + callback_->update_counters(counters_from_db); + + max_download_id_ = to_integer(G()->td_db()->get_binlog_pmc()->get("dlds_max_id")); + + auto downloads_in_kv = G()->td_db()->get_binlog_pmc()->prefix_get("dlds#"); + MultiPromiseActorSafe mtps("DownloadManager: initialization"); + mtps.add_promise(promise_send_closure(actor_id(this), &DownloadManagerImpl::on_synchronized)); + for (auto &it : downloads_in_kv) { + Slice key = it.first; + Slice value = it.second; + FileDownloadInDb in_db; + log_event_parse(in_db, value).ensure(); + in_db.download_id = to_integer_safe(key.substr(4)).move_as_ok(); + auto promise = mtps.get_promise(); + // TODO: load data from MessagesManager + auto unique_file_id = std::move(in_db.unique_file_id); + auto file_source_id = in_db.file_source_id; + auto new_promise = [self = actor_id(this), in_db = std::move(in_db), + promise = std::move(promise)](Result res) mutable { + if (res.is_error()) { + promise.set_value({}); + return; + } + send_closure(self, &DownloadManagerImpl::add_file_from_db, res.move_as_ok(), std::move(in_db)); + promise.set_value({}); + }; + // auto send_closure(G()->messages_manager(), &MessagesManager::todo, unique_file_id, file_source_id, + // std::move(new_promise)); + } + } + + void add_file_from_db(FileId file_id, FileDownloadInDb in_db) { + FileInfo file_info; + file_info.download_id = in_db.download_id; + file_info.file_id = file_id; + file_info.internal_file_id = callback_->dup_file_id(file_id); + file_info.file_source_id = in_db.file_source_id; + file_info.is_paused = false; + file_info.priority = narrow_cast(in_db.priority); + by_internal_file_id_[file_info.internal_file_id] = file_info.download_id; + by_file_id_[file_info.file_id] = file_info.download_id; + //TODO: hints_.add(file_info.download_id, search_by); + + auto download_id = file_info.download_id; + auto it = files_.emplace(download_id, file_info).first; + callback_->start_file(it->second.internal_file_id, it->second.priority); + } void loop() final { if (!callback_) { return; } - // TODO: ??? - // TODO: load active files from db - auto downloads = G()->td_db()->get_downloads_db_sync()->get_active_downloads().move_as_ok(); - for (auto &download : downloads.downloads) { - // ... - } + try_start(); } + void tear_down() final { callback_.reset(); } + + void update_counters() { + if (!is_synchonized_) { + return; + } + if (counters_ == sent_counters_) { + return; + } + sent_counters_ = counters_; + callback_->update_counters(counters_); + } }; unique_ptr DownloadManager::create() { @@ -221,4 +421,20 @@ tl_object_ptr DownloadManager::FoundFileDownloads::t tl_object_ptr DownloadManager::Counters::to_td_api() const { return make_tl_object(total_size, total_count, downloaded_size); } +template +void DownloadManager::Counters::store(StorerT &storer) const { + BEGIN_STORE_FLAGS(); + END_STORE_FLAGS(); + td::store(total_size, storer); + td::store(total_count, storer); + td::store(downloaded_size, storer); +} +template +void DownloadManager::Counters::parse(ParserT &parser) { + BEGIN_PARSE_FLAGS(); + END_PARSE_FLAGS(); + + td::parse(total_count, parser); + td::parse(downloaded_size, parser); +} } // namespace td diff --git a/td/telegram/DownloadManager.h b/td/telegram/DownloadManager.h index df86c96ea..6897c16c2 100644 --- a/td/telegram/DownloadManager.h +++ b/td/telegram/DownloadManager.h @@ -15,6 +15,7 @@ #include "td/utils/common.h" #include "td/utils/Status.h" +#include "td/utils/tl_helpers.h" namespace td { @@ -28,7 +29,16 @@ class DownloadManager : public Actor { int32 total_count{}; int64 downloaded_size{}; + bool operator==(const Counters &other) const { + return total_size == other.total_size && total_count == other.total_count && downloaded_size == other.downloaded_size; + } + tl_object_ptr to_td_api() const; + template + void store(StorerT &storer) const; + + template + void parse(ParserT &parser); }; struct FileDownload { @@ -57,7 +67,6 @@ class DownloadManager : public Actor { virtual FileId dup_file_id(FileId file_id) = 0; virtual string get_unique_file_id(FileId file_id) = 0; - virtual string get_file_source_serialized(FileSourceId file_source_id) = 0; }; // diff --git a/td/telegram/DownloadsDb.cpp b/td/telegram/DownloadsDb.cpp index c0c36ac1d..e69de29bb 100644 --- a/td/telegram/DownloadsDb.cpp +++ b/td/telegram/DownloadsDb.cpp @@ -1,334 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#include "td/telegram/DownloadsDb.h" - -#include "td/telegram/logevent/LogEvent.h" -#include "td/telegram/Version.h" - -#include "td/db/SqliteConnectionSafe.h" -#include "td/db/SqliteDb.h" -#include "td/db/SqliteStatement.h" - -#include "td/actor/actor.h" -#include "td/actor/PromiseFuture.h" -#include "td/actor/SchedulerLocalStorage.h" - -#include "td/utils/format.h" -#include "td/utils/logging.h" -#include "td/utils/ScopeGuard.h" -#include "td/utils/Slice.h" -#include "td/utils/SliceBuilder.h" -#include "td/utils/StackAllocator.h" -#include "td/utils/StringBuilder.h" -#include "td/utils/Time.h" -#include "td/utils/tl_helpers.h" -#include "td/utils/unicode.h" -#include "td/utils/utf8.h" - -#include -#include -#include -#include -#include -#include - -namespace td { - -static constexpr int32 MESSAGES_DB_INDEX_COUNT = 30; -static constexpr int32 MESSAGES_DB_INDEX_COUNT_OLD = 9; - -// NB: must happen inside a transaction -Status init_downloads_db(SqliteDb &db, int32 version) { - LOG(INFO) << "Init downloads database " << tag("version", version); - - // Check if database exists - TRY_RESULT(has_table, db.has_table("downloads")); - if (!has_table) { - version = 0; - } - - auto add_fts = [&db] { - TRY_STATUS( - db.exec("CREATE VIRTUAL TABLE IF NOT EXISTS downloads_fts USING fts5(search_text, content='downloads', " - "content_rowid='download_id', tokenize = \"unicode61 remove_diacritics 0 tokenchars '\a'\")")); - TRY_STATUS( - db.exec("CREATE TRIGGER IF NOT EXISTS trigger_downloads_fts_delete BEFORE DELETE ON downloads" - " BEGIN INSERT INTO downloads_fts(downloads_fts, rowid, search_text) VALUES(\'delete\', " - "OLD.download_id, OLD.search_text); END")); - TRY_STATUS( - db.exec("CREATE TRIGGER IF NOT EXISTS trigger_downloads_fts_insert AFTER INSERT ON downloads" - " BEGIN INSERT INTO downloads_fts(rowid, search_text) VALUES(NEW.download_id, NEW.search_text); END")); - // TODO: update? - - return Status::OK(); - }; - - if (version == 0) { - TRY_STATUS( - db.exec("CREATE TABLE IF NOT EXISTS downloads(download_id INT8 PRIMARY KEY, unique_file_id " - "BLOB UNIQUE, file_source BLOB, search_text STRING, date INT4, priority INT4)")); - // TODO: add indexes - // TRY_STATUS( - // db.exec("CREATE INDEX IF NOT EXISTS message_by_random_id ON messages (dialog_id, random_id) " - // "WHERE random_id IS NOT NULL")); - - TRY_STATUS(add_fts()); - - version = current_db_version(); - } - return Status::OK(); -} - -// NB: must happen inside a transaction -Status drop_downloads_db(SqliteDb &db, int32 version) { - LOG(WARNING) << "Drop downloads database " << tag("version", version) - << tag("current_db_version", current_db_version()); - return db.exec("DROP TABLE IF EXISTS downloads"); -} - -class DownloadsDbImpl final : public DownloadsDbSyncInterface { - public: - explicit DownloadsDbImpl(SqliteDb db) : db_(std::move(db)) { - init().ensure(); - } - - Status init() { - TRY_RESULT_ASSIGN(add_download_stmt_, - db_.get_statement("INSERT OR REPLACE INTO downloads VALUES(NULL, ?1, ?2, ?3, ?4, ?5)")); - TRY_RESULT_ASSIGN( - get_downloads_fts_stmt_, - db_.get_statement("SELECT download_id, unique_file_id, file_source, priority FROM downloads WHERE download_id " - "IN (SELECT rowid FROM downloads_fts WHERE downloads_fts MATCH ?1 AND rowid < ?2 " - "ORDER BY rowid DESC LIMIT ?3) ORDER BY download_id DESC")); - - // LOG(ERROR) << get_message_stmt_.explain().ok(); - // LOG(ERROR) << get_messages_from_notification_id_stmt.explain().ok(); - // LOG(ERROR) << get_message_by_random_id_stmt_.explain().ok(); - // LOG(ERROR) << get_message_by_unique_message_id_stmt_.explain().ok(); - - // LOG(ERROR) << get_expiring_messages_stmt_.explain().ok(); - // LOG(ERROR) << get_expiring_messages_helper_stmt_.explain().ok(); - - // LOG(FATAL) << "EXPLAINED"; - - return Status::OK(); - } - - Result get_downloads_fts(DownloadsDbFtsQuery query) final { - SCOPE_EXIT { - get_downloads_fts_stmt_.reset(); - }; - - auto &stmt = get_downloads_fts_stmt_; - stmt.bind_string(1, query.query).ensure(); - stmt.bind_int64(2, query.offset).ensure(); - stmt.bind_int32(3, query.limit).ensure(); - DownloadsDbFtsResult result; - auto status = stmt.step(); - if (status.is_error()) { - LOG(ERROR) << status; - return std::move(result); - } - while (stmt.has_row()) { - int64 download_id{stmt.view_int64(0)}; - string unique_file_id{stmt.view_string(1).str()}; - string file_source{stmt.view_string(2).str()}; - int32 priority{stmt.view_int32(3)}; - result.next_download_id = download_id; - result.downloads.push_back(DownloadsDbDownloadShort{std::move(unique_file_id), std::move(file_source), priority}); - stmt.step().ensure(); - } - return std::move(result); - } - - Status begin_write_transaction() final { - return db_.begin_write_transaction(); - } - Status commit_transaction() final { - return db_.commit_transaction(); - } - - Status add_download(DownloadsDbDownload download) override { - SCOPE_EXIT { - add_download_stmt_.reset(); - }; - auto &stmt = add_download_stmt_; - - TRY_RESULT_ASSIGN(add_download_stmt_, - db_.get_statement("INSERT OR REPLACE INTO downloads VALUES(NULL, ?1, ?2, ?3, ?4, ?5)")); - stmt.bind_blob(1, download.unique_file_id).ensure(); - stmt.bind_blob(2, download.file_source).ensure(); - stmt.bind_string(3, download.search_text).ensure(); - stmt.bind_int32(4, download.date).ensure(); - stmt.bind_int32(5, download.priority).ensure(); - - stmt.step().ensure(); - - return Status(); - } - Result get_active_downloads() override { - DownloadsDbFtsQuery query; - query.limit = 2000; - query.offset = uint64(1) << 60; - // TODO: optimize query - // TODO: only active - TRY_RESULT(ans, get_downloads_fts(query)); - return GetActiveDownloadsResult{std::move(ans.downloads)}; - } - - private: - SqliteDb db_; - - SqliteStatement add_download_stmt_; - SqliteStatement get_downloads_fts_stmt_; -}; - -std::shared_ptr create_downloads_db_sync( - std::shared_ptr sqlite_connection) { - class DownloadsDbSyncSafe final : public DownloadsDbSyncSafeInterface { - public: - explicit DownloadsDbSyncSafe(std::shared_ptr sqlite_connection) - : lsls_db_([safe_connection = std::move(sqlite_connection)] { - return make_unique(safe_connection->get().clone()); - }) { - } - DownloadsDbSyncInterface &get() final { - return *lsls_db_.get(); - } - - private: - LazySchedulerLocalStorage> lsls_db_; - }; - return std::make_shared(std::move(sqlite_connection)); -} - -class DownloadsDbAsync final : public DownloadsDbAsyncInterface { - public: - DownloadsDbAsync(std::shared_ptr sync_db, int32 scheduler_id) { - impl_ = create_actor_on_scheduler("DownloadsDbActor", scheduler_id, std::move(sync_db)); - } - - void add_download(DownloadsDbDownload query, Promise<> promise) final { - send_closure(impl_, &Impl::add_download, std::move(query), std::move(promise)); - } - - void get_active_downloads(Promise promise) final { - send_closure(impl_, &Impl::get_active_downloads, std::move(promise)); - } - void get_downloads_fts(DownloadsDbFtsQuery query, Promise promise) final { - send_closure(impl_, &Impl::get_downloads_fts, std::move(query), std::move(promise)); - } - - void close(Promise<> promise) final { - send_closure_later(impl_, &Impl::close, std::move(promise)); - } - - void force_flush() final { - send_closure_later(impl_, &Impl::force_flush); - } - - private: - class Impl final : public Actor { - public: - explicit Impl(std::shared_ptr sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) { - } - void add_download(DownloadsDbDownload query, Promise<> promise) { - add_write_query([this, query = std::move(query), promise = std::move(promise)](Unit) mutable { - on_write_result(std::move(promise), sync_db_->add_download(std::move(query))); - }); - } - - void get_downloads_fts(DownloadsDbFtsQuery query, Promise promise) { - add_read_query(); - promise.set_result(sync_db_->get_downloads_fts(std::move(query))); - } - - void get_active_downloads(Promise promise) { - add_read_query(); - promise.set_result(sync_db_->get_active_downloads()); - } - - void close(Promise<> promise) { - do_flush(); - sync_db_safe_.reset(); - sync_db_ = nullptr; - promise.set_value(Unit()); - stop(); - } - - void force_flush() { - LOG(INFO) << "DownloadsDb flushed"; - do_flush(); - } - - private: - std::shared_ptr sync_db_safe_; - DownloadsDbSyncInterface *sync_db_ = nullptr; - - static constexpr size_t MAX_PENDING_QUERIES_COUNT{50}; - static constexpr double MAX_PENDING_QUERIES_DELAY{0.01}; - - //NB: order is important, destructor of pending_writes_ will change pending_write_results_ - vector, Status>> pending_write_results_; - vector> pending_writes_; - double wakeup_at_ = 0; - - void on_write_result(Promise<> promise, Status status) { - // We are inside a transaction and don't know how to handle the error - status.ensure(); - pending_write_results_.emplace_back(std::move(promise), std::move(status)); - } - - template - void add_write_query(F &&f) { - pending_writes_.push_back(PromiseCreator::lambda(std::forward(f), PromiseCreator::Ignore())); - if (pending_writes_.size() > MAX_PENDING_QUERIES_COUNT) { - do_flush(); - wakeup_at_ = 0; - } else if (wakeup_at_ == 0) { - wakeup_at_ = Time::now_cached() + MAX_PENDING_QUERIES_DELAY; - } - if (wakeup_at_ != 0) { - set_timeout_at(wakeup_at_); - } - } - void add_read_query() { - do_flush(); - } - void do_flush() { - if (pending_writes_.empty()) { - return; - } - sync_db_->begin_write_transaction().ensure(); - for (auto &query : pending_writes_) { - query.set_value(Unit()); - } - sync_db_->commit_transaction().ensure(); - pending_writes_.clear(); - for (auto &p : pending_write_results_) { - p.first.set_result(std::move(p.second)); - } - pending_write_results_.clear(); - cancel_timeout(); - } - void timeout_expired() final { - do_flush(); - } - - void start_up() final { - sync_db_ = &sync_db_safe_->get(); - } - }; - ActorOwn impl_; -}; - -std::shared_ptr create_downloads_db_async( - std::shared_ptr sync_db, int32 scheduler_id) { - return std::make_shared(std::move(sync_db), scheduler_id); -} - -} // namespace td diff --git a/td/telegram/DownloadsDb.h b/td/telegram/DownloadsDb.h deleted file mode 100644 index f98022642..000000000 --- a/td/telegram/DownloadsDb.h +++ /dev/null @@ -1,107 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#pragma once - -#include "td/telegram/DialogId.h" -#include "td/telegram/FullMessageId.h" -#include "td/telegram/MessageId.h" -#include "td/telegram/MessageSearchFilter.h" -#include "td/telegram/NotificationId.h" -#include "td/telegram/ServerMessageId.h" - -#include "td/actor/PromiseFuture.h" - -#include "td/utils/buffer.h" -#include "td/utils/common.h" -#include "td/utils/Status.h" - -#include -#include - -namespace td { - -class SqliteConnectionSafe; -class SqliteDb; - -struct DownloadsDbFtsQuery { - string query; - int64 offset{0}; - int32 limit{0}; -}; - -struct DownloadsDbDownloadShort { - string unique_file_id; - string file_source; - int32 priority; -}; -struct DownloadsDbDownload { - string unique_file_id; - string file_source; - string search_text; - int32 date; - int32 priority; -}; - -struct GetActiveDownloadsResult { - std::vector downloads; -}; - -struct DownloadsDbFtsResult { - vector downloads; - int64 next_download_id{}; -}; - -class DownloadsDbSyncInterface { - public: - DownloadsDbSyncInterface() = default; - DownloadsDbSyncInterface(const DownloadsDbSyncInterface &) = delete; - DownloadsDbSyncInterface &operator=(const DownloadsDbSyncInterface &) = delete; - virtual ~DownloadsDbSyncInterface() = default; - - virtual Status add_download(DownloadsDbDownload) = 0; - virtual Result get_active_downloads() = 0; - virtual Result get_downloads_fts(DownloadsDbFtsQuery query) = 0; - - virtual Status begin_write_transaction() = 0; - virtual Status commit_transaction() = 0; -}; - -class DownloadsDbSyncSafeInterface { - public: - DownloadsDbSyncSafeInterface() = default; - DownloadsDbSyncSafeInterface(const DownloadsDbSyncSafeInterface &) = delete; - DownloadsDbSyncSafeInterface &operator=(const DownloadsDbSyncSafeInterface &) = delete; - virtual ~DownloadsDbSyncSafeInterface() = default; - - virtual DownloadsDbSyncInterface &get() = 0; -}; - -class DownloadsDbAsyncInterface { - public: - DownloadsDbAsyncInterface() = default; - DownloadsDbAsyncInterface(const DownloadsDbAsyncInterface &) = delete; - DownloadsDbAsyncInterface &operator=(const DownloadsDbAsyncInterface &) = delete; - virtual ~DownloadsDbAsyncInterface() = default; - - virtual void add_download(DownloadsDbDownload, Promise<>) = 0; - virtual void get_active_downloads(Promise) = 0; - virtual void get_downloads_fts(DownloadsDbFtsQuery query, Promise) = 0; - - virtual void close(Promise<> promise) = 0; - virtual void force_flush() = 0; -}; - -Status init_downloads_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT; -Status drop_downloads_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT; - -std::shared_ptr create_downloads_db_sync( - std::shared_ptr sqlite_connection); - -std::shared_ptr create_downloads_db_async( - std::shared_ptr sync_db, int32 scheduler_id); - -} // namespace td diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index a1f273feb..1286c35e6 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -3981,7 +3981,8 @@ void Td::init_managers() { send_closure(G()->td(), &Td::send_update, counters.to_td_api()); } void start_file(FileId file_id, int8 priority) final { - send_closure(G()->file_manager(), &FileManager::download, file_id, download_file_callback_, priority, -1, -1); + send_closure(G()->file_manager(), &FileManager::download, file_id, make_download_file_callback(), priority, -1, + -1); } void pause_file(FileId file_id) final { send_closure(G()->file_manager(), &FileManager::download, file_id, nullptr, 0, -1, -1); @@ -3996,11 +3997,6 @@ void Td::init_managers() { string get_unique_file_id(FileId file_id) final { return G()->file_manager().get_actor_unsafe()->get_file_view(file_id).get_unique_file_id(); } - string get_file_source_serialized(FileSourceId file_source_id) final { - TlStorerToString storer; - G()->file_reference_manager().get_actor_unsafe()->store_file_source(file_source_id, storer); - return storer.move_as_string(); - } private: ActorShared<> parent_; @@ -4010,7 +4006,7 @@ void Td::init_managers() { if (!download_file_callback_) { class Impl : public FileManager::DownloadCallback { public: - Impl(ActorId download_manager) : download_manager_(download_manager_) { + Impl(ActorId download_manager) : download_manager_(download_manager) { } void on_progress(FileId file_id) final { auto view = G()->file_manager().get_actor_unsafe()->get_file_view(file_id); diff --git a/td/telegram/TdDb.cpp b/td/telegram/TdDb.cpp index be2bbe6c6..65c057067 100644 --- a/td/telegram/TdDb.cpp +++ b/td/telegram/TdDb.cpp @@ -7,7 +7,6 @@ #include "td/telegram/TdDb.h" #include "td/telegram/DialogDb.h" -#include "td/telegram/DownloadsDb.h" #include "td/telegram/files/FileDb.h" #include "td/telegram/Global.h" #include "td/telegram/logevent/LogEvent.h" @@ -203,12 +202,6 @@ DialogDbSyncInterface *TdDb::get_dialog_db_sync() { DialogDbAsyncInterface *TdDb::get_dialog_db_async() { return dialog_db_async_.get(); } -DownloadsDbSyncInterface *TdDb::get_downloads_db_sync() { - return &downloads_db_sync_safe_->get(); -} -DownloadsDbAsyncInterface *TdDb::get_downloads_db_async() { - return downloads_db_async_.get(); -} CSlice TdDb::binlog_path() const { return binlog_->get_path(); @@ -272,11 +265,6 @@ void TdDb::do_close(Promise<> on_finished, bool destroy_flag) { dialog_db_async_->close(mpas.get_promise()); } - downloads_db_sync_safe_.reset(); - if (downloads_db_async_) { - downloads_db_async_->close(mpas.get_promise()); - } - // binlog_pmc is dependent on binlog_ and anyway it doesn't support close_and_destroy CHECK(binlog_pmc_.unique()); binlog_pmc_.reset(); @@ -353,12 +341,6 @@ Status TdDb::init_sqlite(int32 scheduler_id, const TdParameters ¶meters, con TRY_STATUS(drop_file_db(db, user_version)); } - if (use_downloads_db) { - TRY_STATUS(init_downloads_db(db, user_version)); - } else { - TRY_STATUS(drop_downloads_db(db, user_version)); - } - // Update 'PRAGMA user_version' auto db_version = current_db_version(); if (db_version != user_version) { diff --git a/td/telegram/TdDb.h b/td/telegram/TdDb.h index 73720b258..1370961d2 100644 --- a/td/telegram/TdDb.h +++ b/td/telegram/TdDb.h @@ -34,9 +34,6 @@ class FileDbInterface; class MessagesDbSyncInterface; class MessagesDbSyncSafeInterface; class MessagesDbAsyncInterface; -class DownloadsDbSyncInterface; -class DownloadsDbSyncSafeInterface; -class DownloadsDbAsyncInterface; class SqliteConnectionSafe; class SqliteKeyValueSafe; class SqliteKeyValueAsyncInterface; @@ -98,9 +95,6 @@ class TdDb { DialogDbSyncInterface *get_dialog_db_sync(); DialogDbAsyncInterface *get_dialog_db_async(); - DownloadsDbSyncInterface *get_downloads_db_sync(); - DownloadsDbAsyncInterface *get_downloads_db_async(); - void change_key(DbKey key, Promise<> promise); void with_db_path(const std::function &callback); @@ -119,9 +113,6 @@ class TdDb { std::shared_ptr messages_db_sync_safe_; std::shared_ptr messages_db_async_; - std::shared_ptr downloads_db_sync_safe_; - std::shared_ptr downloads_db_async_; - std::shared_ptr dialog_db_sync_safe_; std::shared_ptr dialog_db_async_; diff --git a/td/telegram/files/FileManager.cpp b/td/telegram/files/FileManager.cpp index 44f138fb6..95b731bfa 100644 --- a/td/telegram/files/FileManager.cpp +++ b/td/telegram/files/FileManager.cpp @@ -2205,6 +2205,10 @@ void FileManager::download(FileId file_id, std::shared_ptr cal } file_info->download_priority_ = narrow_cast(new_priority); file_info->download_callback_ = std::move(callback); + + if (file_info->download_callback_) { + file_info->download_callback_->on_progress(file_id); + } // TODO: send current progress? run_generate(node); diff --git a/td/telegram/files/FileSourceId.hpp b/td/telegram/files/FileSourceId.hpp new file mode 100644 index 000000000..b2c95255a --- /dev/null +++ b/td/telegram/files/FileSourceId.hpp @@ -0,0 +1,22 @@ +// +// Created by Arseny on 26.02.2022. +// + +#pragma once +#include "td/telegram/FileReferenceManager.hpp" +#include "td/telegram/files/FileSourceId.h" +#include "td/telegram/Td.h" + +namespace td { +template +void store(const FileSourceId &file_source_id, StorerT &storer) { + Td *td = storer.context()->td().get_actor_unsafe(); + td->file_reference_manager_->store_file_source(file_source_id, storer); +} + +template +void parse(FileSourceId &file_source_id, ParserT &parser) { + Td *td = parser.context()->td().get_actor_unsafe(); + file_source_id = td->file_reference_manager_->parse_file_source(td, parser); +} +} // namespace td