DownloadManager: WIP

This commit is contained in:
Arseny Smirnov 2022-02-26 19:55:12 +01:00
parent 4524ba0380
commit f13598ec83
10 changed files with 318 additions and 539 deletions

View File

@ -314,7 +314,6 @@ set(TDLIB_SOURCE
td/telegram/Document.cpp
td/telegram/DocumentsManager.cpp
td/telegram/DownloadManager.cpp
td/telegram/DownloadsDb.cpp
td/telegram/DraftMessage.cpp
td/telegram/FileReferenceManager.cpp
td/telegram/files/FileBitmask.cpp
@ -681,6 +680,7 @@ set(TDLIB_SOURCE
td/telegram/files/FileId.hpp
td/telegram/files/FileLocation.hpp
td/telegram/files/FileManager.hpp
td/telegram/files/FileSourceId.hpp
td/telegram/Game.hpp
td/telegram/InputMessageText.hpp
td/telegram/MessageEntity.hpp

View File

@ -6,14 +6,54 @@
//
#include "td/telegram/DownloadManager.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/Hints.h"
#include "td/utils/tl_helpers.h"
#include "td/actor/MultiPromise.h"
#include "td/telegram/files/FileSourceId.hpp"
#include "td/telegram/DownloadsDb.h"
#include "td/telegram/Global.h"
#include "td/telegram/logevent/LogEvent.h"
#include "td/telegram/TdDb.h"
#include "td/utils/algorithm.h"
#include "td/utils/FlatHashMap.h"
namespace td {
//TODO: replace LOG(ERROR) with something else
struct FileDownloadInDb {
int64 download_id{};
string unique_file_id;
FileSourceId file_source_id;
string search_text;
bool is_paused{};
int priority{};
template <class StorerT>
void store(StorerT &storer) const {
BEGIN_STORE_FLAGS();
STORE_FLAG(is_paused);
END_STORE_FLAGS();
td::store(download_id, storer);
td::store(unique_file_id, storer);
td::store(file_source_id, storer);
td::store(priority, storer);
}
template <class ParserT>
void parse(ParserT &parser) {
BEGIN_PARSE_FLAGS();
PARSE_FLAG(is_paused);
END_PARSE_FLAGS();
td::parse(download_id, parser);
td::parse(unique_file_id, parser);
td::parse(file_source_id, parser);
td::parse(priority, parser);
}
};
class DownloadManagerImpl final : public DownloadManager {
public:
void set_callback(unique_ptr<Callback> callback) final {
@ -25,12 +65,19 @@ class DownloadManagerImpl final : public DownloadManager {
if (!callback_) {
return Status::OK();
}
auto it = active_files_.find(file_id);
if (it == active_files_.end()) {
auto fit = by_file_id_.find(file_id);
if (fit == by_file_id_.end()) {
return Status::Error(400, "Can't find file");
}
auto it = files_.find(fit->second);
CHECK(it != files_.end());
auto &file_info = it->second;
if (!is_active(file_info)) {
return Status::Error(400, "File is already downloaded");
}
if (is_paused != file_info.is_paused) {
file_info.is_paused = is_paused;
if (is_paused) {
@ -40,7 +87,7 @@ class DownloadManagerImpl final : public DownloadManager {
}
}
// TODO: update db
sync_with_db(file_info);
return Status::OK();
}
@ -49,11 +96,11 @@ class DownloadManagerImpl final : public DownloadManager {
if (!callback_) {
return Status::Error("TODO: code and message`");
}
for (auto &it : active_files_) {
toggle_is_paused(it.first, is_paused);
for (auto &it : files_) {
toggle_is_paused(it.second.file_id, is_paused);
}
// TODO: update db
return Status::OK();
}
@ -61,8 +108,12 @@ class DownloadManagerImpl final : public DownloadManager {
if (!callback_) {
return Status::Error("TODO: code and message`");
}
auto it = active_files_.find(file_id);
if (it != active_files_.end() && (!file_source_id.is_valid() || file_source_id == it->second.file_source_id)) {
auto fit = by_file_id_.find(file_id);
if (fit == by_file_id_.end()) {
return Status::OK();
}
auto it = files_.find(fit->second);
if (it != files_.end() && (!file_source_id.is_valid() || file_source_id == it->second.file_source_id)) {
auto &file_info = it->second;
if (!file_info.is_paused) {
callback_->pause_file(file_info.internal_file_id);
@ -71,10 +122,16 @@ class DownloadManagerImpl final : public DownloadManager {
callback_->delete_file(file_info.internal_file_id);
}
by_internal_file_id_.erase(file_info.internal_file_id);
by_file_id_.erase(file_info.file_id);
hints_.remove(file_info.download_id);
counters_.total_count--;
counters_.downloaded_size -= file_info.downloaded_size;
counters_.total_size -= file_info.size;
active_files_.erase(it);
remove_from_db(file_info);
files_.erase(it);
}
// TODO: remove from db
return Status::OK();
}
@ -82,21 +139,21 @@ class DownloadManagerImpl final : public DownloadManager {
if (!callback_) {
return Status::Error("TODO: code and message`");
}
if (!only_completed) {
for (auto &it : active_files_) {
FileInfo &file_info = it.second;
if (!file_info.is_paused) {
callback_->pause_file(file_info.internal_file_id);
}
if (delete_from_cache) {
callback_->delete_file(file_info.internal_file_id);
}
//TODO(later): index?
std::vector<FileId> to_remove;
for (auto &it : files_) {
FileInfo &file_info = it.second;
if (only_active && !is_active(file_info)) {
continue;
}
active_files_.clear();
if (only_completed && is_active(file_info)) {
continue;
}
to_remove.push_back(file_info.file_id);
}
for (auto file_id : to_remove) {
remove_file(file_id, {}, delete_from_cache);
}
// TODO: remove from db. should respect only_active
// TODO: if delete_from_cache, should iterate all files in db
return Status::OK();
}
@ -104,24 +161,29 @@ class DownloadManagerImpl final : public DownloadManager {
if (!callback_) {
return Status::Error("TODO: code and message`");
}
// TODO: maybe we should ignore this query in some cases
remove_file(file_id, {}, false);
FileInfo file_info;
file_info.download_id = next_download_id();
file_info.file_id = file_id;
file_info.internal_file_id = callback_->dup_file_id(file_id);
file_info.file_source_id = file_source_id;
file_info.is_paused = false;
file_info.priority = priority;
by_internal_file_id_[file_info.internal_file_id] = file_id;
by_internal_file_id_[file_info.internal_file_id] = file_info.download_id;
by_file_id_[file_info.file_id] = file_info.download_id;
hints_.add(file_info.download_id, search_by);
if (active_files_.count(file_id) == 0) {
counters_.total_count++;
callback_->update_counters(counters_);
}
active_files_[file_id] = file_info;
callback_->start_file(file_info.internal_file_id, file_info.priority);
G()->td_db()->get_downloads_db_async()->add_download(
DownloadsDbDownload{callback_->get_unique_file_id(file_id),
callback_->get_file_source_serialized(file_source_id), search_by, 0, priority},
[](Result<Unit>) {});
auto download_id = file_info.download_id;
file_info.need_save_to_db = true;
auto it = files_.emplace(download_id, file_info).first;
callback_->start_file(it->second.internal_file_id, it->second.priority);
sync_with_db(it->second);
counters_.total_count++;
update_counters();
return Status::OK();
}
void search(string query, bool only_active, bool only_completed, string offset, int32 limit,
@ -130,10 +192,42 @@ class DownloadManagerImpl final : public DownloadManager {
return promise.set_error(Status::Error("TODO: code and message`"));
}
TRY_RESULT_PROMISE(promise, offset_int64, to_integer_safe<int64>(offset));
// TODO: only active, only completed
G()->td_db()->get_downloads_db_async()->get_downloads_fts(DownloadsDbFtsQuery{query, offset_int64, limit},
[](Result<DownloadsDbFtsResult>) {});
return promise.set_value({});
auto ids = hints_.search(query, 200, true).second;
remove_if(ids, [&](auto download_id) {
auto it = files_.find(download_id);
if (it == files_.end()) {
return true;
}
auto &file_info = it->second;
if (only_active && !is_active(file_info)) {
return true;
}
if (only_completed && is_active(file_info)) {
return true;
}
return false;
});
std::sort(ids.begin(), ids.end(), std::greater<>());
FoundFileDownloads found;
found.total_count = narrow_cast<int32>(ids.size());
remove_if(ids, [offset_int64](auto id) { return id < offset_int64; });
if (static_cast<int32>(ids.size()) > limit) {
ids.resize(max(0, limit));
}
int64 last_id = 0;
found.file_downloads = transform(ids, [&](auto id) {
auto it = files_.find(id);
FileInfo &file_info = it->second;
FileDownload res;
res.is_paused = file_info.is_paused;
res.file_source_id = file_info.file_source_id;
res.file_id = it->second.file_id;
return res;
});
if (last_id != 0) {
found.offset = to_string(last_id);
}
return promise.set_value(std::move(found));
}
void update_file_download_state(FileId internal_file_id, int64 download_size, int64 size, bool is_paused) final {
@ -145,9 +239,10 @@ class DownloadManagerImpl final : public DownloadManager {
if (by_internal_file_id_it == by_internal_file_id_.end()) {
return;
}
auto it = active_files_.find(by_internal_file_id_it->second);
CHECK(it != active_files_.end());
auto it = files_.find(by_internal_file_id_it->second);
CHECK(it != files_.end());
auto &file_info = it->second;
counters_.downloaded_size -= file_info.downloaded_size;
counters_.total_size -= file_info.size;
file_info.size = size;
@ -156,15 +251,7 @@ class DownloadManagerImpl final : public DownloadManager {
counters_.total_size += file_info.size;
file_info.is_paused = is_paused;
if (download_size == size) {
active_files_.erase(it);
by_internal_file_id_.erase(by_internal_file_id_it);
if (active_files_.empty()) {
counters_ = {};
}
}
callback_->update_counters(counters_);
update_counters();
}
void update_file_deleted(FileId internal_file_id) final {
@ -172,43 +259,156 @@ class DownloadManagerImpl final : public DownloadManager {
return;
}
auto it = by_internal_file_id_.find(internal_file_id);
if (it == by_internal_file_id_.end()) {
auto fit = by_internal_file_id_.find(internal_file_id);
if (fit == by_internal_file_id_.end()) {
return;
}
remove_file(it->second, {}, false);
auto it = files_.find(fit->second);
CHECK(it != files_.end());
remove_file(it->second.file_id, {}, false);
}
private:
unique_ptr<Callback> callback_;
struct FileInfo {
int64 download_id{};
int8 priority;
bool is_paused{};
FileId file_id{};
FileId internal_file_id{};
FileSourceId file_source_id{};
int64 size{};
int64 downloaded_size{};
bool need_save_to_db{false};
};
FlatHashMap<FileId, FileInfo, FileIdHash> active_files_;
FlatHashMap<FileId, FileId, FileIdHash> by_internal_file_id_;
FlatHashMap<FileId, int64, FileIdHash> by_file_id_;
FlatHashMap<FileId, int64, FileIdHash> by_internal_file_id_;
FlatHashMap<int64, FileInfo> files_;
size_t active_files_count_{0};
Hints hints_;
Counters counters_;
Counters sent_counters_;
bool is_synchonized_{false};
bool is_started_{false};
int64 max_download_id_{0};
int64 next_download_id() {
auto res = ++max_download_id_;
G()->td_db()->get_binlog_pmc()->set("dlds_max_id", to_string(res));
return res;
}
bool is_active(const FileInfo &file_info) const {
return file_info.size == 0 || file_info.downloaded_size != file_info.size;
}
static std::string pmc_key(const FileInfo &file_info) {
return PSTRING() << "dlds" << file_info.download_id;
}
void sync_with_db(FileInfo &file_info) {
if (!file_info.need_save_to_db) {
return;
}
file_info.need_save_to_db = false;
FileDownloadInDb to_save;
to_save.download_id = file_info.download_id;
to_save.file_source_id = file_info.file_source_id;
to_save.is_paused = file_info.is_paused;
to_save.priority = file_info.priority;
to_save.unique_file_id = callback_->get_unique_file_id(file_info.file_id);
G()->td_db()->get_binlog_pmc()->set(pmc_key(file_info), log_event_store(to_save).as_slice().str());
}
void remove_from_db(const FileInfo &file_info) {
G()->td_db()->get_binlog_pmc()->erase(pmc_key(file_info));
}
void on_synchronized(Result<Unit>) {
LOG(ERROR) << "DownloadManager: synchornized";
is_synchonized_ = true;
update_counters();
}
void try_start() {
if (is_started_) {
return;
}
is_started_ = true;
auto serialized_counter = G()->td_db()->get_binlog_pmc()->get("dlds_counter");
Counters counters_from_db;
if (!serialized_counter.empty()) {
log_event_parse(counters_from_db, serialized_counter).ensure();
}
callback_->update_counters(counters_from_db);
max_download_id_ = to_integer<int64>(G()->td_db()->get_binlog_pmc()->get("dlds_max_id"));
auto downloads_in_kv = G()->td_db()->get_binlog_pmc()->prefix_get("dlds#");
MultiPromiseActorSafe mtps("DownloadManager: initialization");
mtps.add_promise(promise_send_closure(actor_id(this), &DownloadManagerImpl::on_synchronized));
for (auto &it : downloads_in_kv) {
Slice key = it.first;
Slice value = it.second;
FileDownloadInDb in_db;
log_event_parse(in_db, value).ensure();
in_db.download_id = to_integer_safe<int64>(key.substr(4)).move_as_ok();
auto promise = mtps.get_promise();
// TODO: load data from MessagesManager
auto unique_file_id = std::move(in_db.unique_file_id);
auto file_source_id = in_db.file_source_id;
auto new_promise = [self = actor_id(this), in_db = std::move(in_db),
promise = std::move(promise)](Result<FileId> res) mutable {
if (res.is_error()) {
promise.set_value({});
return;
}
send_closure(self, &DownloadManagerImpl::add_file_from_db, res.move_as_ok(), std::move(in_db));
promise.set_value({});
};
// auto send_closure(G()->messages_manager(), &MessagesManager::todo, unique_file_id, file_source_id,
// std::move(new_promise));
}
}
void add_file_from_db(FileId file_id, FileDownloadInDb in_db) {
FileInfo file_info;
file_info.download_id = in_db.download_id;
file_info.file_id = file_id;
file_info.internal_file_id = callback_->dup_file_id(file_id);
file_info.file_source_id = in_db.file_source_id;
file_info.is_paused = false;
file_info.priority = narrow_cast<int8>(in_db.priority);
by_internal_file_id_[file_info.internal_file_id] = file_info.download_id;
by_file_id_[file_info.file_id] = file_info.download_id;
//TODO: hints_.add(file_info.download_id, search_by);
auto download_id = file_info.download_id;
auto it = files_.emplace(download_id, file_info).first;
callback_->start_file(it->second.internal_file_id, it->second.priority);
}
void loop() final {
if (!callback_) {
return;
}
// TODO: ???
// TODO: load active files from db
auto downloads = G()->td_db()->get_downloads_db_sync()->get_active_downloads().move_as_ok();
for (auto &download : downloads.downloads) {
// ...
}
try_start();
}
void tear_down() final {
callback_.reset();
}
void update_counters() {
if (!is_synchonized_) {
return;
}
if (counters_ == sent_counters_) {
return;
}
sent_counters_ = counters_;
callback_->update_counters(counters_);
}
};
unique_ptr<DownloadManager> DownloadManager::create() {
@ -221,4 +421,20 @@ tl_object_ptr<td_api::foundFileDownloads> DownloadManager::FoundFileDownloads::t
tl_object_ptr<td_api::updateFileDownloads> DownloadManager::Counters::to_td_api() const {
return make_tl_object<td_api::updateFileDownloads>(total_size, total_count, downloaded_size);
}
template <class StorerT>
void DownloadManager::Counters::store(StorerT &storer) const {
BEGIN_STORE_FLAGS();
END_STORE_FLAGS();
td::store(total_size, storer);
td::store(total_count, storer);
td::store(downloaded_size, storer);
}
template <class ParserT>
void DownloadManager::Counters::parse(ParserT &parser) {
BEGIN_PARSE_FLAGS();
END_PARSE_FLAGS();
td::parse(total_count, parser);
td::parse(downloaded_size, parser);
}
} // namespace td

View File

@ -15,6 +15,7 @@
#include "td/utils/common.h"
#include "td/utils/Status.h"
#include "td/utils/tl_helpers.h"
namespace td {
@ -28,7 +29,16 @@ class DownloadManager : public Actor {
int32 total_count{};
int64 downloaded_size{};
bool operator==(const Counters &other) const {
return total_size == other.total_size && total_count == other.total_count && downloaded_size == other.downloaded_size;
}
tl_object_ptr<td_api::updateFileDownloads> to_td_api() const;
template <class StorerT>
void store(StorerT &storer) const;
template <class ParserT>
void parse(ParserT &parser);
};
struct FileDownload {
@ -57,7 +67,6 @@ class DownloadManager : public Actor {
virtual FileId dup_file_id(FileId file_id) = 0;
virtual string get_unique_file_id(FileId file_id) = 0;
virtual string get_file_source_serialized(FileSourceId file_source_id) = 0;
};
//

View File

@ -1,334 +0,0 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/DownloadsDb.h"
#include "td/telegram/logevent/LogEvent.h"
#include "td/telegram/Version.h"
#include "td/db/SqliteConnectionSafe.h"
#include "td/db/SqliteDb.h"
#include "td/db/SqliteStatement.h"
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
#include "td/actor/SchedulerLocalStorage.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/StackAllocator.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/Time.h"
#include "td/utils/tl_helpers.h"
#include "td/utils/unicode.h"
#include "td/utils/utf8.h"
#include <algorithm>
#include <array>
#include <iterator>
#include <limits>
#include <tuple>
#include <utility>
namespace td {
static constexpr int32 MESSAGES_DB_INDEX_COUNT = 30;
static constexpr int32 MESSAGES_DB_INDEX_COUNT_OLD = 9;
// NB: must happen inside a transaction
Status init_downloads_db(SqliteDb &db, int32 version) {
LOG(INFO) << "Init downloads database " << tag("version", version);
// Check if database exists
TRY_RESULT(has_table, db.has_table("downloads"));
if (!has_table) {
version = 0;
}
auto add_fts = [&db] {
TRY_STATUS(
db.exec("CREATE VIRTUAL TABLE IF NOT EXISTS downloads_fts USING fts5(search_text, content='downloads', "
"content_rowid='download_id', tokenize = \"unicode61 remove_diacritics 0 tokenchars '\a'\")"));
TRY_STATUS(
db.exec("CREATE TRIGGER IF NOT EXISTS trigger_downloads_fts_delete BEFORE DELETE ON downloads"
" BEGIN INSERT INTO downloads_fts(downloads_fts, rowid, search_text) VALUES(\'delete\', "
"OLD.download_id, OLD.search_text); END"));
TRY_STATUS(
db.exec("CREATE TRIGGER IF NOT EXISTS trigger_downloads_fts_insert AFTER INSERT ON downloads"
" BEGIN INSERT INTO downloads_fts(rowid, search_text) VALUES(NEW.download_id, NEW.search_text); END"));
// TODO: update?
return Status::OK();
};
if (version == 0) {
TRY_STATUS(
db.exec("CREATE TABLE IF NOT EXISTS downloads(download_id INT8 PRIMARY KEY, unique_file_id "
"BLOB UNIQUE, file_source BLOB, search_text STRING, date INT4, priority INT4)"));
// TODO: add indexes
// TRY_STATUS(
// db.exec("CREATE INDEX IF NOT EXISTS message_by_random_id ON messages (dialog_id, random_id) "
// "WHERE random_id IS NOT NULL"));
TRY_STATUS(add_fts());
version = current_db_version();
}
return Status::OK();
}
// NB: must happen inside a transaction
Status drop_downloads_db(SqliteDb &db, int32 version) {
LOG(WARNING) << "Drop downloads database " << tag("version", version)
<< tag("current_db_version", current_db_version());
return db.exec("DROP TABLE IF EXISTS downloads");
}
class DownloadsDbImpl final : public DownloadsDbSyncInterface {
public:
explicit DownloadsDbImpl(SqliteDb db) : db_(std::move(db)) {
init().ensure();
}
Status init() {
TRY_RESULT_ASSIGN(add_download_stmt_,
db_.get_statement("INSERT OR REPLACE INTO downloads VALUES(NULL, ?1, ?2, ?3, ?4, ?5)"));
TRY_RESULT_ASSIGN(
get_downloads_fts_stmt_,
db_.get_statement("SELECT download_id, unique_file_id, file_source, priority FROM downloads WHERE download_id "
"IN (SELECT rowid FROM downloads_fts WHERE downloads_fts MATCH ?1 AND rowid < ?2 "
"ORDER BY rowid DESC LIMIT ?3) ORDER BY download_id DESC"));
// LOG(ERROR) << get_message_stmt_.explain().ok();
// LOG(ERROR) << get_messages_from_notification_id_stmt.explain().ok();
// LOG(ERROR) << get_message_by_random_id_stmt_.explain().ok();
// LOG(ERROR) << get_message_by_unique_message_id_stmt_.explain().ok();
// LOG(ERROR) << get_expiring_messages_stmt_.explain().ok();
// LOG(ERROR) << get_expiring_messages_helper_stmt_.explain().ok();
// LOG(FATAL) << "EXPLAINED";
return Status::OK();
}
Result<DownloadsDbFtsResult> get_downloads_fts(DownloadsDbFtsQuery query) final {
SCOPE_EXIT {
get_downloads_fts_stmt_.reset();
};
auto &stmt = get_downloads_fts_stmt_;
stmt.bind_string(1, query.query).ensure();
stmt.bind_int64(2, query.offset).ensure();
stmt.bind_int32(3, query.limit).ensure();
DownloadsDbFtsResult result;
auto status = stmt.step();
if (status.is_error()) {
LOG(ERROR) << status;
return std::move(result);
}
while (stmt.has_row()) {
int64 download_id{stmt.view_int64(0)};
string unique_file_id{stmt.view_string(1).str()};
string file_source{stmt.view_string(2).str()};
int32 priority{stmt.view_int32(3)};
result.next_download_id = download_id;
result.downloads.push_back(DownloadsDbDownloadShort{std::move(unique_file_id), std::move(file_source), priority});
stmt.step().ensure();
}
return std::move(result);
}
Status begin_write_transaction() final {
return db_.begin_write_transaction();
}
Status commit_transaction() final {
return db_.commit_transaction();
}
Status add_download(DownloadsDbDownload download) override {
SCOPE_EXIT {
add_download_stmt_.reset();
};
auto &stmt = add_download_stmt_;
TRY_RESULT_ASSIGN(add_download_stmt_,
db_.get_statement("INSERT OR REPLACE INTO downloads VALUES(NULL, ?1, ?2, ?3, ?4, ?5)"));
stmt.bind_blob(1, download.unique_file_id).ensure();
stmt.bind_blob(2, download.file_source).ensure();
stmt.bind_string(3, download.search_text).ensure();
stmt.bind_int32(4, download.date).ensure();
stmt.bind_int32(5, download.priority).ensure();
stmt.step().ensure();
return Status();
}
Result<GetActiveDownloadsResult> get_active_downloads() override {
DownloadsDbFtsQuery query;
query.limit = 2000;
query.offset = uint64(1) << 60;
// TODO: optimize query
// TODO: only active
TRY_RESULT(ans, get_downloads_fts(query));
return GetActiveDownloadsResult{std::move(ans.downloads)};
}
private:
SqliteDb db_;
SqliteStatement add_download_stmt_;
SqliteStatement get_downloads_fts_stmt_;
};
std::shared_ptr<DownloadsDbSyncSafeInterface> create_downloads_db_sync(
std::shared_ptr<SqliteConnectionSafe> sqlite_connection) {
class DownloadsDbSyncSafe final : public DownloadsDbSyncSafeInterface {
public:
explicit DownloadsDbSyncSafe(std::shared_ptr<SqliteConnectionSafe> sqlite_connection)
: lsls_db_([safe_connection = std::move(sqlite_connection)] {
return make_unique<DownloadsDbImpl>(safe_connection->get().clone());
}) {
}
DownloadsDbSyncInterface &get() final {
return *lsls_db_.get();
}
private:
LazySchedulerLocalStorage<unique_ptr<DownloadsDbSyncInterface>> lsls_db_;
};
return std::make_shared<DownloadsDbSyncSafe>(std::move(sqlite_connection));
}
class DownloadsDbAsync final : public DownloadsDbAsyncInterface {
public:
DownloadsDbAsync(std::shared_ptr<DownloadsDbSyncSafeInterface> sync_db, int32 scheduler_id) {
impl_ = create_actor_on_scheduler<Impl>("DownloadsDbActor", scheduler_id, std::move(sync_db));
}
void add_download(DownloadsDbDownload query, Promise<> promise) final {
send_closure(impl_, &Impl::add_download, std::move(query), std::move(promise));
}
void get_active_downloads(Promise<GetActiveDownloadsResult> promise) final {
send_closure(impl_, &Impl::get_active_downloads, std::move(promise));
}
void get_downloads_fts(DownloadsDbFtsQuery query, Promise<DownloadsDbFtsResult> promise) final {
send_closure(impl_, &Impl::get_downloads_fts, std::move(query), std::move(promise));
}
void close(Promise<> promise) final {
send_closure_later(impl_, &Impl::close, std::move(promise));
}
void force_flush() final {
send_closure_later(impl_, &Impl::force_flush);
}
private:
class Impl final : public Actor {
public:
explicit Impl(std::shared_ptr<DownloadsDbSyncSafeInterface> sync_db_safe) : sync_db_safe_(std::move(sync_db_safe)) {
}
void add_download(DownloadsDbDownload query, Promise<> promise) {
add_write_query([this, query = std::move(query), promise = std::move(promise)](Unit) mutable {
on_write_result(std::move(promise), sync_db_->add_download(std::move(query)));
});
}
void get_downloads_fts(DownloadsDbFtsQuery query, Promise<DownloadsDbFtsResult> promise) {
add_read_query();
promise.set_result(sync_db_->get_downloads_fts(std::move(query)));
}
void get_active_downloads(Promise<GetActiveDownloadsResult> promise) {
add_read_query();
promise.set_result(sync_db_->get_active_downloads());
}
void close(Promise<> promise) {
do_flush();
sync_db_safe_.reset();
sync_db_ = nullptr;
promise.set_value(Unit());
stop();
}
void force_flush() {
LOG(INFO) << "DownloadsDb flushed";
do_flush();
}
private:
std::shared_ptr<DownloadsDbSyncSafeInterface> sync_db_safe_;
DownloadsDbSyncInterface *sync_db_ = nullptr;
static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
//NB: order is important, destructor of pending_writes_ will change pending_write_results_
vector<std::pair<Promise<>, Status>> pending_write_results_;
vector<Promise<>> pending_writes_;
double wakeup_at_ = 0;
void on_write_result(Promise<> promise, Status status) {
// We are inside a transaction and don't know how to handle the error
status.ensure();
pending_write_results_.emplace_back(std::move(promise), std::move(status));
}
template <class F>
void add_write_query(F &&f) {
pending_writes_.push_back(PromiseCreator::lambda(std::forward<F>(f), PromiseCreator::Ignore()));
if (pending_writes_.size() > MAX_PENDING_QUERIES_COUNT) {
do_flush();
wakeup_at_ = 0;
} else if (wakeup_at_ == 0) {
wakeup_at_ = Time::now_cached() + MAX_PENDING_QUERIES_DELAY;
}
if (wakeup_at_ != 0) {
set_timeout_at(wakeup_at_);
}
}
void add_read_query() {
do_flush();
}
void do_flush() {
if (pending_writes_.empty()) {
return;
}
sync_db_->begin_write_transaction().ensure();
for (auto &query : pending_writes_) {
query.set_value(Unit());
}
sync_db_->commit_transaction().ensure();
pending_writes_.clear();
for (auto &p : pending_write_results_) {
p.first.set_result(std::move(p.second));
}
pending_write_results_.clear();
cancel_timeout();
}
void timeout_expired() final {
do_flush();
}
void start_up() final {
sync_db_ = &sync_db_safe_->get();
}
};
ActorOwn<Impl> impl_;
};
std::shared_ptr<DownloadsDbAsyncInterface> create_downloads_db_async(
std::shared_ptr<DownloadsDbSyncSafeInterface> sync_db, int32 scheduler_id) {
return std::make_shared<DownloadsDbAsync>(std::move(sync_db), scheduler_id);
}
} // namespace td

View File

@ -1,107 +0,0 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/telegram/DialogId.h"
#include "td/telegram/FullMessageId.h"
#include "td/telegram/MessageId.h"
#include "td/telegram/MessageSearchFilter.h"
#include "td/telegram/NotificationId.h"
#include "td/telegram/ServerMessageId.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/Status.h"
#include <memory>
#include <utility>
namespace td {
class SqliteConnectionSafe;
class SqliteDb;
struct DownloadsDbFtsQuery {
string query;
int64 offset{0};
int32 limit{0};
};
struct DownloadsDbDownloadShort {
string unique_file_id;
string file_source;
int32 priority;
};
struct DownloadsDbDownload {
string unique_file_id;
string file_source;
string search_text;
int32 date;
int32 priority;
};
struct GetActiveDownloadsResult {
std::vector<DownloadsDbDownloadShort> downloads;
};
struct DownloadsDbFtsResult {
vector<DownloadsDbDownloadShort> downloads;
int64 next_download_id{};
};
class DownloadsDbSyncInterface {
public:
DownloadsDbSyncInterface() = default;
DownloadsDbSyncInterface(const DownloadsDbSyncInterface &) = delete;
DownloadsDbSyncInterface &operator=(const DownloadsDbSyncInterface &) = delete;
virtual ~DownloadsDbSyncInterface() = default;
virtual Status add_download(DownloadsDbDownload) = 0;
virtual Result<GetActiveDownloadsResult> get_active_downloads() = 0;
virtual Result<DownloadsDbFtsResult> get_downloads_fts(DownloadsDbFtsQuery query) = 0;
virtual Status begin_write_transaction() = 0;
virtual Status commit_transaction() = 0;
};
class DownloadsDbSyncSafeInterface {
public:
DownloadsDbSyncSafeInterface() = default;
DownloadsDbSyncSafeInterface(const DownloadsDbSyncSafeInterface &) = delete;
DownloadsDbSyncSafeInterface &operator=(const DownloadsDbSyncSafeInterface &) = delete;
virtual ~DownloadsDbSyncSafeInterface() = default;
virtual DownloadsDbSyncInterface &get() = 0;
};
class DownloadsDbAsyncInterface {
public:
DownloadsDbAsyncInterface() = default;
DownloadsDbAsyncInterface(const DownloadsDbAsyncInterface &) = delete;
DownloadsDbAsyncInterface &operator=(const DownloadsDbAsyncInterface &) = delete;
virtual ~DownloadsDbAsyncInterface() = default;
virtual void add_download(DownloadsDbDownload, Promise<>) = 0;
virtual void get_active_downloads(Promise<GetActiveDownloadsResult>) = 0;
virtual void get_downloads_fts(DownloadsDbFtsQuery query, Promise<DownloadsDbFtsResult>) = 0;
virtual void close(Promise<> promise) = 0;
virtual void force_flush() = 0;
};
Status init_downloads_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT;
Status drop_downloads_db(SqliteDb &db, int version) TD_WARN_UNUSED_RESULT;
std::shared_ptr<DownloadsDbSyncSafeInterface> create_downloads_db_sync(
std::shared_ptr<SqliteConnectionSafe> sqlite_connection);
std::shared_ptr<DownloadsDbAsyncInterface> create_downloads_db_async(
std::shared_ptr<DownloadsDbSyncSafeInterface> sync_db, int32 scheduler_id);
} // namespace td

View File

@ -3981,7 +3981,8 @@ void Td::init_managers() {
send_closure(G()->td(), &Td::send_update, counters.to_td_api());
}
void start_file(FileId file_id, int8 priority) final {
send_closure(G()->file_manager(), &FileManager::download, file_id, download_file_callback_, priority, -1, -1);
send_closure(G()->file_manager(), &FileManager::download, file_id, make_download_file_callback(), priority, -1,
-1);
}
void pause_file(FileId file_id) final {
send_closure(G()->file_manager(), &FileManager::download, file_id, nullptr, 0, -1, -1);
@ -3996,11 +3997,6 @@ void Td::init_managers() {
string get_unique_file_id(FileId file_id) final {
return G()->file_manager().get_actor_unsafe()->get_file_view(file_id).get_unique_file_id();
}
string get_file_source_serialized(FileSourceId file_source_id) final {
TlStorerToString storer;
G()->file_reference_manager().get_actor_unsafe()->store_file_source(file_source_id, storer);
return storer.move_as_string();
}
private:
ActorShared<> parent_;
@ -4010,7 +4006,7 @@ void Td::init_managers() {
if (!download_file_callback_) {
class Impl : public FileManager::DownloadCallback {
public:
Impl(ActorId<DownloadManager> download_manager) : download_manager_(download_manager_) {
Impl(ActorId<DownloadManager> download_manager) : download_manager_(download_manager) {
}
void on_progress(FileId file_id) final {
auto view = G()->file_manager().get_actor_unsafe()->get_file_view(file_id);

View File

@ -7,7 +7,6 @@
#include "td/telegram/TdDb.h"
#include "td/telegram/DialogDb.h"
#include "td/telegram/DownloadsDb.h"
#include "td/telegram/files/FileDb.h"
#include "td/telegram/Global.h"
#include "td/telegram/logevent/LogEvent.h"
@ -203,12 +202,6 @@ DialogDbSyncInterface *TdDb::get_dialog_db_sync() {
DialogDbAsyncInterface *TdDb::get_dialog_db_async() {
return dialog_db_async_.get();
}
DownloadsDbSyncInterface *TdDb::get_downloads_db_sync() {
return &downloads_db_sync_safe_->get();
}
DownloadsDbAsyncInterface *TdDb::get_downloads_db_async() {
return downloads_db_async_.get();
}
CSlice TdDb::binlog_path() const {
return binlog_->get_path();
@ -272,11 +265,6 @@ void TdDb::do_close(Promise<> on_finished, bool destroy_flag) {
dialog_db_async_->close(mpas.get_promise());
}
downloads_db_sync_safe_.reset();
if (downloads_db_async_) {
downloads_db_async_->close(mpas.get_promise());
}
// binlog_pmc is dependent on binlog_ and anyway it doesn't support close_and_destroy
CHECK(binlog_pmc_.unique());
binlog_pmc_.reset();
@ -353,12 +341,6 @@ Status TdDb::init_sqlite(int32 scheduler_id, const TdParameters &parameters, con
TRY_STATUS(drop_file_db(db, user_version));
}
if (use_downloads_db) {
TRY_STATUS(init_downloads_db(db, user_version));
} else {
TRY_STATUS(drop_downloads_db(db, user_version));
}
// Update 'PRAGMA user_version'
auto db_version = current_db_version();
if (db_version != user_version) {

View File

@ -34,9 +34,6 @@ class FileDbInterface;
class MessagesDbSyncInterface;
class MessagesDbSyncSafeInterface;
class MessagesDbAsyncInterface;
class DownloadsDbSyncInterface;
class DownloadsDbSyncSafeInterface;
class DownloadsDbAsyncInterface;
class SqliteConnectionSafe;
class SqliteKeyValueSafe;
class SqliteKeyValueAsyncInterface;
@ -98,9 +95,6 @@ class TdDb {
DialogDbSyncInterface *get_dialog_db_sync();
DialogDbAsyncInterface *get_dialog_db_async();
DownloadsDbSyncInterface *get_downloads_db_sync();
DownloadsDbAsyncInterface *get_downloads_db_async();
void change_key(DbKey key, Promise<> promise);
void with_db_path(const std::function<void(CSlice)> &callback);
@ -119,9 +113,6 @@ class TdDb {
std::shared_ptr<MessagesDbSyncSafeInterface> messages_db_sync_safe_;
std::shared_ptr<MessagesDbAsyncInterface> messages_db_async_;
std::shared_ptr<DownloadsDbSyncSafeInterface> downloads_db_sync_safe_;
std::shared_ptr<DownloadsDbAsyncInterface> downloads_db_async_;
std::shared_ptr<DialogDbSyncSafeInterface> dialog_db_sync_safe_;
std::shared_ptr<DialogDbAsyncInterface> dialog_db_async_;

View File

@ -2205,6 +2205,10 @@ void FileManager::download(FileId file_id, std::shared_ptr<DownloadCallback> cal
}
file_info->download_priority_ = narrow_cast<int8>(new_priority);
file_info->download_callback_ = std::move(callback);
if (file_info->download_callback_) {
file_info->download_callback_->on_progress(file_id);
}
// TODO: send current progress?
run_generate(node);

View File

@ -0,0 +1,22 @@
//
// Created by Arseny on 26.02.2022.
//
#pragma once
#include "td/telegram/FileReferenceManager.hpp"
#include "td/telegram/files/FileSourceId.h"
#include "td/telegram/Td.h"
namespace td {
template <class StorerT>
void store(const FileSourceId &file_source_id, StorerT &storer) {
Td *td = storer.context()->td().get_actor_unsafe();
td->file_reference_manager_->store_file_source(file_source_id, storer);
}
template <class ParserT>
void parse(FileSourceId &file_source_id, ParserT &parser) {
Td *td = parser.context()->td().get_actor_unsafe();
file_source_id = td->file_reference_manager_->parse_file_source(td, parser);
}
} // namespace td