DownloadManager: WIP
This commit is contained in:
parent
f13598ec83
commit
b2ae91ef48
@ -507,7 +507,6 @@ set(TDLIB_SOURCE
|
||||
td/telegram/Document.h
|
||||
td/telegram/DocumentsManager.h
|
||||
td/telegram/DownloadManager.h
|
||||
td/telegram/DownloadsDb.h
|
||||
td/telegram/DraftMessage.h
|
||||
td/telegram/EncryptedFile.h
|
||||
td/telegram/FileReferenceManager.h
|
||||
|
@ -11,25 +11,25 @@
|
||||
#include "td/utils/tl_helpers.h"
|
||||
|
||||
#include "td/actor/MultiPromise.h"
|
||||
#include "td/telegram/FileReferenceManager.h"
|
||||
#include "td/telegram/files/FileSourceId.hpp"
|
||||
#include "td/telegram/DownloadsDb.h"
|
||||
#include "td/telegram/Global.h"
|
||||
#include "td/telegram/logevent/LogEvent.h"
|
||||
#include "td/telegram/TdDb.h"
|
||||
#include "td/utils/algorithm.h"
|
||||
|
||||
#include "td/utils/FlatHashMap.h"
|
||||
|
||||
namespace td {
|
||||
|
||||
//TODO: replace LOG(ERROR) with something else
|
||||
struct FileDownloadInDb {
|
||||
int64 download_id{};
|
||||
string unique_file_id;
|
||||
FileSourceId file_source_id;
|
||||
string search_text;
|
||||
bool is_paused{};
|
||||
int priority{};
|
||||
int created_at{};
|
||||
int completed_at{};
|
||||
|
||||
bool is_paused{};
|
||||
|
||||
template <class StorerT>
|
||||
void store(StorerT &storer) const {
|
||||
@ -40,6 +40,8 @@ struct FileDownloadInDb {
|
||||
td::store(unique_file_id, storer);
|
||||
td::store(file_source_id, storer);
|
||||
td::store(priority, storer);
|
||||
td::store(created_at, storer);
|
||||
td::store(completed_at, storer);
|
||||
}
|
||||
|
||||
template <class ParserT>
|
||||
@ -51,6 +53,8 @@ struct FileDownloadInDb {
|
||||
td::parse(unique_file_id, parser);
|
||||
td::parse(file_source_id, parser);
|
||||
td::parse(priority, parser);
|
||||
td::parse(created_at, parser);
|
||||
td::parse(completed_at, parser);
|
||||
}
|
||||
};
|
||||
|
||||
@ -62,24 +66,19 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
}
|
||||
|
||||
Status toggle_is_paused(FileId file_id, bool is_paused) final {
|
||||
if (!callback_) {
|
||||
return Status::OK();
|
||||
}
|
||||
TRY_STATUS(check_is_active());
|
||||
|
||||
auto fit = by_file_id_.find(file_id);
|
||||
if (fit == by_file_id_.end()) {
|
||||
return Status::Error(400, "Can't find file");
|
||||
}
|
||||
auto it = files_.find(fit->second);
|
||||
CHECK(it != files_.end());
|
||||
|
||||
auto &file_info = it->second;
|
||||
TRY_RESULT(file_info_ptr, get_file_info(file_id));
|
||||
auto &file_info = *file_info_ptr;
|
||||
if (!is_active(file_info)) {
|
||||
return Status::Error(400, "File is already downloaded");
|
||||
}
|
||||
|
||||
if (is_paused != file_info.is_paused) {
|
||||
file_info.is_paused = is_paused;
|
||||
with_file_info(file_info, [&](auto &file_info) {
|
||||
file_info.is_paused = is_paused;
|
||||
file_info.need_save_to_db = true;
|
||||
});
|
||||
if (is_paused) {
|
||||
callback_->pause_file(file_info.internal_file_id);
|
||||
} else {
|
||||
@ -87,15 +86,11 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
}
|
||||
}
|
||||
|
||||
sync_with_db(file_info);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status toggle_all_is_paused(bool is_paused) final {
|
||||
if (!callback_) {
|
||||
return Status::Error("TODO: code and message`");
|
||||
}
|
||||
TRY_STATUS(check_is_active());
|
||||
|
||||
for (auto &it : files_) {
|
||||
toggle_is_paused(it.second.file_id, is_paused);
|
||||
@ -105,41 +100,34 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
}
|
||||
|
||||
Status remove_file(FileId file_id, FileSourceId file_source_id, bool delete_from_cache) final {
|
||||
if (!callback_) {
|
||||
return Status::Error("TODO: code and message`");
|
||||
TRY_STATUS(check_is_active());
|
||||
TRY_RESULT(file_info_ptr, get_file_info(file_id, file_source_id));
|
||||
auto &file_info = *file_info_ptr;
|
||||
if (!file_info.is_paused) {
|
||||
callback_->pause_file(file_info.internal_file_id);
|
||||
}
|
||||
auto fit = by_file_id_.find(file_id);
|
||||
if (fit == by_file_id_.end()) {
|
||||
return Status::OK();
|
||||
}
|
||||
auto it = files_.find(fit->second);
|
||||
if (it != files_.end() && (!file_source_id.is_valid() || file_source_id == it->second.file_source_id)) {
|
||||
auto &file_info = it->second;
|
||||
if (!file_info.is_paused) {
|
||||
callback_->pause_file(file_info.internal_file_id);
|
||||
}
|
||||
if (delete_from_cache) {
|
||||
callback_->delete_file(file_info.internal_file_id);
|
||||
}
|
||||
by_internal_file_id_.erase(file_info.internal_file_id);
|
||||
by_file_id_.erase(file_info.file_id);
|
||||
hints_.remove(file_info.download_id);
|
||||
counters_.total_count--;
|
||||
counters_.downloaded_size -= file_info.downloaded_size;
|
||||
counters_.total_size -= file_info.size;
|
||||
|
||||
remove_from_db(file_info);
|
||||
files_.erase(it);
|
||||
if (delete_from_cache) {
|
||||
callback_->delete_file(file_info.internal_file_id);
|
||||
}
|
||||
by_internal_file_id_.erase(file_info.internal_file_id);
|
||||
by_file_id_.erase(file_info.file_id);
|
||||
hints_.remove(file_info.download_id);
|
||||
unregister_file_info(file_info);
|
||||
|
||||
remove_from_db(file_info);
|
||||
files_.erase(file_info.download_id);
|
||||
return Status::OK();
|
||||
}
|
||||
Status change_search_text(FileId file_id, FileSourceId file_source_id, string search_by) final {
|
||||
TRY_STATUS(check_is_active());
|
||||
TRY_RESULT(file_info_ptr, get_file_info(file_id, file_source_id));
|
||||
auto &file_info = *file_info_ptr;
|
||||
hints_.add(file_info.download_id, search_by);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status remove_all_files(bool only_active, bool only_completed, bool delete_from_cache) final {
|
||||
if (!callback_) {
|
||||
return Status::Error("TODO: code and message`");
|
||||
}
|
||||
//TODO(later): index?
|
||||
TRY_STATUS(check_is_active());
|
||||
std::vector<FileId> to_remove;
|
||||
for (auto &it : files_) {
|
||||
FileInfo &file_info = it.second;
|
||||
@ -158,9 +146,7 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
}
|
||||
|
||||
Status add_file(FileId file_id, FileSourceId file_source_id, string search_by, int8 priority) final {
|
||||
if (!callback_) {
|
||||
return Status::Error("TODO: code and message`");
|
||||
}
|
||||
TRY_STATUS(check_is_active());
|
||||
|
||||
// TODO: maybe we should ignore this query in some cases
|
||||
remove_file(file_id, {}, false);
|
||||
@ -172,6 +158,7 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
file_info.file_source_id = file_source_id;
|
||||
file_info.is_paused = false;
|
||||
file_info.priority = priority;
|
||||
file_info.created_at = static_cast<int32>(G()->server_time());
|
||||
by_internal_file_id_[file_info.internal_file_id] = file_info.download_id;
|
||||
by_file_id_[file_info.file_id] = file_info.download_id;
|
||||
hints_.add(file_info.download_id, search_by);
|
||||
@ -179,26 +166,29 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
auto download_id = file_info.download_id;
|
||||
file_info.need_save_to_db = true;
|
||||
auto it = files_.emplace(download_id, file_info).first;
|
||||
register_file_info(it->second);
|
||||
callback_->start_file(it->second.internal_file_id, it->second.priority);
|
||||
sync_with_db(it->second);
|
||||
counters_.total_count++;
|
||||
update_counters();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void search(string query, bool only_active, bool only_completed, string offset, int32 limit,
|
||||
Promise<FoundFileDownloads> promise) final {
|
||||
if (!callback_) {
|
||||
return promise.set_error(Status::Error("TODO: code and message`"));
|
||||
TRY_STATUS_PROMISE(promise, check_is_active());
|
||||
int64 offset_int64 = std::numeric_limits<int64>::max();
|
||||
if (!offset.empty()) {
|
||||
auto r_offset = to_integer_safe<int64>(offset);
|
||||
if (r_offset.is_error()) {
|
||||
return promise.set_error(Status::Error(400, "Invalid offset"));
|
||||
}
|
||||
offset_int64 = r_offset.move_as_ok();
|
||||
}
|
||||
TRY_RESULT_PROMISE(promise, offset_int64, to_integer_safe<int64>(offset));
|
||||
auto ids = hints_.search(query, 200, true).second;
|
||||
auto ids = query.empty() ? transform(files_, [](auto &it) { return it.first; }) : hints_.search(query, 200).second;
|
||||
remove_if(ids, [&](auto download_id) {
|
||||
auto it = files_.find(download_id);
|
||||
if (it == files_.end()) {
|
||||
auto r = get_file_info(download_id);
|
||||
if (r.is_error()) {
|
||||
return true;
|
||||
}
|
||||
auto &file_info = it->second;
|
||||
auto &file_info = *r.ok();
|
||||
if (only_active && !is_active(file_info)) {
|
||||
return true;
|
||||
}
|
||||
@ -222,6 +212,8 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
res.is_paused = file_info.is_paused;
|
||||
res.file_source_id = file_info.file_source_id;
|
||||
res.file_id = it->second.file_id;
|
||||
res.add_date = file_info.created_at;
|
||||
res.complete_date = file_info.completed_at;
|
||||
return res;
|
||||
});
|
||||
if (last_id != 0) {
|
||||
@ -234,24 +226,20 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
if (!callback_) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto by_internal_file_id_it = by_internal_file_id_.find(internal_file_id);
|
||||
if (by_internal_file_id_it == by_internal_file_id_.end()) {
|
||||
auto r_file_info_ptr = get_file_info_by_internal(internal_file_id);
|
||||
if (r_file_info_ptr.is_error()) {
|
||||
return;
|
||||
}
|
||||
auto it = files_.find(by_internal_file_id_it->second);
|
||||
CHECK(it != files_.end());
|
||||
auto &file_info = it->second;
|
||||
auto &file_info = *r_file_info_ptr.ok();
|
||||
|
||||
counters_.downloaded_size -= file_info.downloaded_size;
|
||||
counters_.total_size -= file_info.size;
|
||||
file_info.size = size;
|
||||
file_info.downloaded_size = download_size;
|
||||
counters_.downloaded_size += file_info.downloaded_size;
|
||||
counters_.total_size += file_info.size;
|
||||
file_info.is_paused = is_paused;
|
||||
|
||||
update_counters();
|
||||
with_file_info(file_info, [&](FileInfo &file_info) {
|
||||
file_info.size = size;
|
||||
file_info.downloaded_size = download_size;
|
||||
if (file_info.is_paused != is_paused) {
|
||||
file_info.is_paused = is_paused;
|
||||
file_info.need_save_to_db = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void update_file_deleted(FileId internal_file_id) final {
|
||||
@ -259,13 +247,12 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
return;
|
||||
}
|
||||
|
||||
auto fit = by_internal_file_id_.find(internal_file_id);
|
||||
if (fit == by_internal_file_id_.end()) {
|
||||
auto r_file_info_ptr = get_file_info_by_internal(internal_file_id);
|
||||
if (r_file_info_ptr.is_error()) {
|
||||
return;
|
||||
}
|
||||
auto it = files_.find(fit->second);
|
||||
CHECK(it != files_.end());
|
||||
remove_file(it->second.file_id, {}, false);
|
||||
auto &file_info = *r_file_info_ptr.ok();
|
||||
remove_file(file_info.file_id, {}, false);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -279,8 +266,10 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
FileSourceId file_source_id{};
|
||||
int64 size{};
|
||||
int64 downloaded_size{};
|
||||
int32 created_at{};
|
||||
int32 completed_at{};
|
||||
|
||||
bool need_save_to_db{false};
|
||||
mutable bool need_save_to_db{false};
|
||||
};
|
||||
|
||||
FlatHashMap<FileId, int64, FileIdHash> by_file_id_;
|
||||
@ -307,7 +296,7 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
static std::string pmc_key(const FileInfo &file_info) {
|
||||
return PSTRING() << "dlds" << file_info.download_id;
|
||||
}
|
||||
void sync_with_db(FileInfo &file_info) {
|
||||
void sync_with_db(const FileInfo &file_info) {
|
||||
if (!file_info.need_save_to_db) {
|
||||
return;
|
||||
}
|
||||
@ -354,11 +343,10 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
log_event_parse(in_db, value).ensure();
|
||||
in_db.download_id = to_integer_safe<int64>(key.substr(4)).move_as_ok();
|
||||
auto promise = mtps.get_promise();
|
||||
// TODO: load data from MessagesManager
|
||||
auto unique_file_id = std::move(in_db.unique_file_id);
|
||||
auto file_source_id = in_db.file_source_id;
|
||||
auto new_promise = [self = actor_id(this), in_db = std::move(in_db),
|
||||
promise = std::move(promise)](Result<FileId> res) mutable {
|
||||
promise = std::move(promise)](Result<FileSearchInfo> res) mutable {
|
||||
if (res.is_error()) {
|
||||
promise.set_value({});
|
||||
return;
|
||||
@ -366,25 +354,28 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
send_closure(self, &DownloadManagerImpl::add_file_from_db, res.move_as_ok(), std::move(in_db));
|
||||
promise.set_value({});
|
||||
};
|
||||
// auto send_closure(G()->messages_manager(), &MessagesManager::todo, unique_file_id, file_source_id,
|
||||
// std::move(new_promise));
|
||||
send_closure(G()->file_reference_manager(), &FileReferenceManager::get_file_info, file_source_id,
|
||||
std::move(unique_file_id), std::move(new_promise));
|
||||
}
|
||||
}
|
||||
|
||||
void add_file_from_db(FileId file_id, FileDownloadInDb in_db) {
|
||||
void add_file_from_db(FileSearchInfo file_search_info, FileDownloadInDb in_db) {
|
||||
FileInfo file_info;
|
||||
file_info.download_id = in_db.download_id;
|
||||
file_info.file_id = file_id;
|
||||
file_info.internal_file_id = callback_->dup_file_id(file_id);
|
||||
file_info.file_id = file_search_info.file_id;
|
||||
file_info.internal_file_id = callback_->dup_file_id(file_info.file_id);
|
||||
file_info.file_source_id = in_db.file_source_id;
|
||||
file_info.is_paused = false;
|
||||
file_info.priority = narrow_cast<int8>(in_db.priority);
|
||||
file_info.completed_at = in_db.completed_at;
|
||||
file_info.created_at = in_db.created_at;
|
||||
by_internal_file_id_[file_info.internal_file_id] = file_info.download_id;
|
||||
by_file_id_[file_info.file_id] = file_info.download_id;
|
||||
//TODO: hints_.add(file_info.download_id, search_by);
|
||||
hints_.add(file_info.download_id, file_search_info.search_text);
|
||||
|
||||
auto download_id = file_info.download_id;
|
||||
auto it = files_.emplace(download_id, file_info).first;
|
||||
register_file_info(it->second);
|
||||
callback_->start_file(it->second.internal_file_id, it->second.priority);
|
||||
}
|
||||
|
||||
@ -409,6 +400,63 @@ class DownloadManagerImpl final : public DownloadManager {
|
||||
sent_counters_ = counters_;
|
||||
callback_->update_counters(counters_);
|
||||
}
|
||||
|
||||
td::Result<const FileInfo *> get_file_info(FileId file_id, FileSourceId file_source_id = {}) {
|
||||
auto it = by_file_id_.find(file_id);
|
||||
if (it == by_file_id_.end()) {
|
||||
return Status::Error(400, "Can't find file");
|
||||
}
|
||||
return get_file_info(it->second, file_source_id);
|
||||
}
|
||||
|
||||
td::Result<const FileInfo *> get_file_info_by_internal(FileId file_id) {
|
||||
auto it = by_internal_file_id_.find(file_id);
|
||||
if (it == by_internal_file_id_.end()) {
|
||||
return Status::Error(400, "Can't find file");
|
||||
}
|
||||
return get_file_info(it->second);
|
||||
}
|
||||
|
||||
td::Result<const FileInfo *> get_file_info(int64 download_id, FileSourceId file_source_id = {}) {
|
||||
auto it = files_.find(download_id);
|
||||
if (it == files_.end()) {
|
||||
return Status::Error(400, "Can't find file");
|
||||
}
|
||||
if (file_source_id.is_valid() && file_source_id != it->second.file_source_id) {
|
||||
return Status::Error(400, "Can't find file with such source");
|
||||
}
|
||||
return &it->second;
|
||||
}
|
||||
|
||||
void unregister_file_info(const FileInfo &file_info) {
|
||||
active_files_count_ -= is_active(file_info);
|
||||
counters_.downloaded_size -= file_info.downloaded_size;
|
||||
counters_.total_size -= file_info.size;
|
||||
}
|
||||
void register_file_info(FileInfo &file_info) {
|
||||
active_files_count_ += is_active(file_info);
|
||||
counters_.downloaded_size += file_info.downloaded_size;
|
||||
counters_.total_size += file_info.size;
|
||||
if (!is_active(file_info) && file_info.completed_at == 0) {
|
||||
file_info.completed_at = static_cast<int32>(G()->server_time());
|
||||
file_info.need_save_to_db = true;
|
||||
}
|
||||
sync_with_db(file_info);
|
||||
update_counters();
|
||||
}
|
||||
template <class F>
|
||||
void with_file_info(const FileInfo &const_file_info, F &&f) {
|
||||
unregister_file_info(const_file_info);
|
||||
auto &file_info = const_cast<FileInfo &>(const_file_info);
|
||||
f(file_info);
|
||||
register_file_info(file_info);
|
||||
}
|
||||
Status check_is_active() {
|
||||
if (!callback_) {
|
||||
return Status::Error("TODO: code and message`");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
};
|
||||
|
||||
unique_ptr<DownloadManager> DownloadManager::create() {
|
||||
@ -421,6 +469,7 @@ tl_object_ptr<td_api::foundFileDownloads> DownloadManager::FoundFileDownloads::t
|
||||
tl_object_ptr<td_api::updateFileDownloads> DownloadManager::Counters::to_td_api() const {
|
||||
return make_tl_object<td_api::updateFileDownloads>(total_size, total_count, downloaded_size);
|
||||
}
|
||||
|
||||
template <class StorerT>
|
||||
void DownloadManager::Counters::store(StorerT &storer) const {
|
||||
BEGIN_STORE_FLAGS();
|
||||
@ -429,11 +478,12 @@ void DownloadManager::Counters::store(StorerT &storer) const {
|
||||
td::store(total_count, storer);
|
||||
td::store(downloaded_size, storer);
|
||||
}
|
||||
|
||||
template <class ParserT>
|
||||
void DownloadManager::Counters::parse(ParserT &parser) {
|
||||
BEGIN_PARSE_FLAGS();
|
||||
END_PARSE_FLAGS();
|
||||
|
||||
td::parse(total_size, parser);
|
||||
td::parse(total_count, parser);
|
||||
td::parse(downloaded_size, parser);
|
||||
}
|
||||
|
@ -30,7 +30,8 @@ class DownloadManager : public Actor {
|
||||
int64 downloaded_size{};
|
||||
|
||||
bool operator==(const Counters &other) const {
|
||||
return total_size == other.total_size && total_count == other.total_count && downloaded_size == other.downloaded_size;
|
||||
return total_size == other.total_size && total_count == other.total_count &&
|
||||
downloaded_size == other.downloaded_size;
|
||||
}
|
||||
|
||||
tl_object_ptr<td_api::updateFileDownloads> to_td_api() const;
|
||||
@ -79,6 +80,7 @@ class DownloadManager : public Actor {
|
||||
virtual Status toggle_is_paused(FileId, bool is_paused) = 0;
|
||||
virtual Status toggle_all_is_paused(bool is_paused) = 0;
|
||||
virtual Status remove_file(FileId file_id, FileSourceId file_source_id, bool delete_from_cache) = 0;
|
||||
virtual Status change_search_text(FileId file_id, FileSourceId file_source_id, string search_by) = 0;
|
||||
virtual Status remove_all_files(bool only_active, bool only_completed, bool delete_from_cache) = 0;
|
||||
// Files are always added in is_paused = false state
|
||||
virtual Status add_file(FileId file_id, FileSourceId file_source_id, string search_by, int8 priority) = 0;
|
||||
|
Loading…
x
Reference in New Issue
Block a user