Revert "DownloadManager: use messages db only for search query"

This reverts commit e253f857bd.
This commit is contained in:
Arseny Smirnov 2022-03-01 17:24:05 +01:00
parent 68e1e7c0e8
commit 9fdf346ee6

View File

@ -7,7 +7,6 @@
#include "td/telegram/DownloadManager.h"
#include "td/telegram/FileReferenceManager.h"
#include "td/telegram/files/FileId.hpp"
#include "td/telegram/files/FileSourceId.hpp"
#include "td/telegram/Global.h"
#include "td/telegram/logevent/LogEvent.h"
@ -32,9 +31,8 @@ namespace td {
struct FileDownloadInDb {
int64 download_id{};
FileId file_id;
string unique_file_id;
FileSourceId file_source_id;
std::string search_text;
int32 priority{};
int32 created_at{};
int32 completed_at{};
@ -46,9 +44,8 @@ struct FileDownloadInDb {
STORE_FLAG(is_paused);
END_STORE_FLAGS();
td::store(download_id, storer);
td::store(file_id, storer);
td::store(unique_file_id, storer);
td::store(file_source_id, storer);
td::store(search_text, storer);
td::store(priority, storer);
td::store(created_at, storer);
td::store(completed_at, storer);
@ -60,9 +57,8 @@ struct FileDownloadInDb {
PARSE_FLAG(is_paused);
END_PARSE_FLAGS();
td::parse(download_id, parser);
td::parse(file_id, parser);
td::parse(unique_file_id, parser);
td::parse(file_source_id, parser);
td::parse(search_text, parser);
td::parse(priority, parser);
td::parse(created_at, parser);
td::parse(completed_at, parser);
@ -198,41 +194,18 @@ class DownloadManagerImpl final : public DownloadManager {
if (total_count > limit) {
ids.resize(limit);
}
string next_offset;
if (!ids.empty()) {
next_offset = to_string(ids.back());
}
MultiPromiseActorSafe mpas("DownloadManager search");
mpas.set_ignore_errors(true);
auto lock = mpas.get_promise();
for (auto id : ids) {
auto it = files_.find(id);
const FileInfo &file_info = *it->second;
send_closure(G()->file_reference_manager(), &FileReferenceManager::get_file_search_info, file_info.file_source_id,
callback_->get_file_view(file_info.file_id).get_unique_file_id(),
[cont = mpas.get_promise()](auto) mutable { cont.set_value({}); });
}
mpas.add_promise([self = actor_id(this), ids = std::move(ids), total_count, next_offset = std::move(next_offset),
promise = std::move(promise)](Result<Unit>) mutable {
send_closure(self, &DownloadManagerImpl::finish_search, std::move(ids), total_count, std::move(next_offset),
std::move(promise));
});
lock.set_value({});
}
void finish_search(std::vector<int64> ids, int32 total_count, std::string next_offset,
Promise<td_api::object_ptr<td_api::foundFileDownloads>> promise) {
auto file_downloads = transform(ids, [&](auto id) {
auto it = files_.find(id);
if (it == files_.end()) {
return td_api::object_ptr<td_api::fileDownload>();
}
CHECK(it != files_.end());
const FileInfo &file_info = *it->second;
return callback_->get_file_download_object(file_info.file_id, file_info.file_source_id, file_info.created_at,
file_info.completed_at, file_info.is_paused);
});
td::remove_if(file_downloads,
[](const auto &file_download) { return !file_download || file_download->message_ == nullptr; });
td::remove_if(file_downloads, [](const auto &file_download) { return file_download->message_ == nullptr; });
string next_offset;
if (!ids.empty()) {
next_offset = to_string(ids.back());
}
promise.set_value(
td_api::make_object<td_api::foundFileDownloads>(total_count, std::move(file_downloads), next_offset));
}
@ -332,7 +305,7 @@ class DownloadManagerImpl final : public DownloadManager {
to_save.priority = file_info.priority;
to_save.created_at = file_info.created_at;
to_save.completed_at = file_info.completed_at;
to_save.file_id = file_info.file_id;
to_save.unique_file_id = callback_->get_file_view(file_info.file_id).get_unique_file_id();
G()->td_db()->get_binlog_pmc()->set(pmc_key(file_info), log_event_store(to_save).as_slice().str());
}
static void remove_from_db(const FileInfo &file_info) {
@ -358,6 +331,9 @@ class DownloadManagerImpl final : public DownloadManager {
auto downloads_in_kv = G()->td_db()->get_binlog_pmc()->prefix_get("dlds#");
MultiPromiseActorSafe mpas("DownloadManager: initialization");
mpas.add_promise(promise_send_closure(actor_id(this), &DownloadManagerImpl::on_synchronized));
mpas.set_ignore_errors(true);
auto lock = mpas.get_promise();
for (auto &it : downloads_in_kv) {
Slice key = it.first;
Slice value = it.second;
@ -365,14 +341,27 @@ class DownloadManagerImpl final : public DownloadManager {
log_event_parse(in_db, value).ensure();
CHECK(in_db.download_id == to_integer_safe<int64>(key).ok());
max_download_id_ = max(in_db.download_id, max_download_id_);
add_file_from_db(std::move(in_db));
auto promise = mpas.get_promise();
auto unique_file_id = std::move(in_db.unique_file_id);
auto file_source_id = in_db.file_source_id;
auto new_promise = [actor_id = actor_id(this), in_db = std::move(in_db),
promise = std::move(promise)](Result<FileSearchInfo> res) mutable {
if (res.is_error()) {
return promise.set_value(Unit());
}
send_closure(actor_id, &DownloadManagerImpl::add_file_from_db, res.move_as_ok(), std::move(in_db),
std::move(promise));
};
send_closure(G()->file_reference_manager(), &FileReferenceManager::get_file_search_info, file_source_id,
std::move(unique_file_id), std::move(new_promise));
}
lock.set_value(Unit());
}
void add_file_from_db(FileDownloadInDb in_db) {
if (by_file_id_.count(in_db.file_id) != 0) {
void add_file_from_db(FileSearchInfo file_search_info, FileDownloadInDb in_db, Promise<Unit> promise) {
if (by_file_id_.count(file_search_info.file_id) != 0) {
// file has already been added
return;
return promise.set_value(Unit());
}
if (in_db.completed_at > 0) {
@ -382,14 +371,16 @@ class DownloadManagerImpl final : public DownloadManager {
auto file_info = make_unique<FileInfo>();
file_info->download_id = in_db.download_id;
file_info->file_id = in_db.file_id;
file_info->file_id = file_search_info.file_id;
file_info->file_source_id = in_db.file_source_id;
file_info->is_paused = in_db.is_paused;
file_info->priority = narrow_cast<int8>(in_db.priority);
file_info->completed_at = in_db.completed_at;
file_info->created_at = in_db.created_at;
add_file_info(std::move(file_info), std::move(in_db.search_text));
add_file_info(std::move(file_info), file_search_info.search_text);
promise.set_value(Unit());
}
void add_file_info(unique_ptr<FileInfo> &&file_info, const string &search_text) {