DownloadManager create unique callback for each download

This commit is contained in:
Arseny Smirnov 2022-03-01 15:34:51 +01:00
parent 1fce347ee0
commit 52abb9c5d5
4 changed files with 42 additions and 33 deletions

View File

@ -221,11 +221,14 @@ class DownloadManagerImpl final : public DownloadManager {
return;
}
auto &file_info = *r_file_info_ptr.ok();
if (file_info.link_token != get_link_token()) {
LOG(INFO) << "Ignore update_file_download_state because of outdated link_token";
}
with_file_info(file_info, [&](FileInfo &file_info) {
file_info.size = size;
file_info.downloaded_size = download_size;
if (file_info.is_paused != is_paused) {
if (is_paused && file_info.is_paused != is_paused) {
file_info.is_paused = is_paused;
file_info.need_save_to_db = true;
}
@ -259,6 +262,7 @@ class DownloadManagerImpl final : public DownloadManager {
int64 downloaded_size{};
int32 created_at{};
int32 completed_at{};
uint64 link_token{};
};
FlatHashMap<FileId, int64, FileIdHash> by_file_id_;
@ -272,6 +276,7 @@ class DownloadManagerImpl final : public DownloadManager {
bool is_synchonized_{false};
bool is_started_{false};
int64 max_download_id_{0};
uint64 last_link_token_{0};
int64 next_download_id() {
return ++max_download_id_;
@ -389,13 +394,14 @@ class DownloadManagerImpl final : public DownloadManager {
by_internal_file_id_[file_info->internal_file_id] = download_id;
by_file_id_[file_info->file_id] = download_id;
hints_.add(download_id, search_text.empty() ? string(" ") : search_text);
file_info->link_token = ++last_link_token_;
LOG(INFO) << "Adding to downloads file " << file_info->file_id << '/' << file_info->internal_file_id
<< " with is_paused = " << file_info->is_paused;
auto it = files_.emplace(download_id, std::move(file_info)).first;
register_file_info(*it->second);
if (it->second->completed_at == 0) {
callback_->start_file(it->second->internal_file_id, it->second->priority);
callback_->start_file(it->second->internal_file_id, it->second->priority, it->second->link_token);
}
}
@ -419,11 +425,12 @@ class DownloadManagerImpl final : public DownloadManager {
with_file_info(file_info, [&](auto &file_info) {
file_info.is_paused = is_paused;
file_info.need_save_to_db = true;
file_info.link_token = ++last_link_token_;
});
if (is_paused) {
callback_->pause_file(file_info.internal_file_id);
} else {
callback_->start_file(file_info.internal_file_id, file_info.priority);
callback_->start_file(file_info.internal_file_id, file_info.priority, file_info.link_token);
}
}

View File

@ -49,7 +49,7 @@ class DownloadManager : public Actor {
virtual ~Callback() = default;
virtual void update_counters(Counters counters) = 0;
virtual void update_file_removed(FileId file_id) = 0;
virtual void start_file(FileId file_id, int8 priority) = 0;
virtual void start_file(FileId file_id, int8 priority, uint64 link_token) = 0;
virtual void pause_file(FileId file_id) = 0;
virtual void delete_file(FileId file_id) = 0;
virtual FileId dup_file_id(FileId file_id) = 0;

View File

@ -3984,9 +3984,9 @@ void Td::init_managers() {
send_closure(G()->td(), &Td::send_update,
td_api::make_object<td_api::updateFileRemovedFromDownloads>(file_id.get()));
}
void start_file(FileId file_id, int8 priority) final {
send_closure(G()->file_manager(), &FileManager::download, file_id, make_download_file_callback(), priority,
FileManager::KEEP_DOWNLOAD_OFFSET, FileManager::IGNORE_DOWNLOAD_LIMIT);
void start_file(FileId file_id, int8 priority, uint64 link_token) final {
send_closure(G()->file_manager(), &FileManager::download, file_id, make_download_file_callback(link_token),
priority, FileManager::KEEP_DOWNLOAD_OFFSET, FileManager::IGNORE_DOWNLOAD_LIMIT);
}
void pause_file(FileId file_id) final {
send_closure(G()->file_manager(), &FileManager::download, file_id, nullptr, 0, FileManager::KEEP_DOWNLOAD_OFFSET,
@ -4016,33 +4016,32 @@ void Td::init_managers() {
private:
ActorShared<> parent_;
ActorId<DownloadManager> download_manager_;
std::shared_ptr<FileManager::DownloadCallback> download_file_callback_;
std::shared_ptr<FileManager::DownloadCallback> make_download_file_callback() {
if (!download_file_callback_) {
std::shared_ptr<FileManager::DownloadCallback> make_download_file_callback(uint64 link_token) {
class Impl final : public FileManager::DownloadCallback {
public:
explicit Impl(ActorId<DownloadManager> download_manager) : download_manager_(download_manager) {
explicit Impl(ActorShared<DownloadManager> download_manager) : download_manager_(std::move(download_manager)) {
}
void on_progress(FileId file_id) final {
auto td = G()->td().get_actor_unsafe();
auto file_view = td->file_manager_->get_file_view(file_id);
send_closure(download_manager_, &DownloadManager::update_file_download_state, file_id,
file_view.local_total_size(), file_view.expected_size(), !file_view.is_downloading());
// TODO: handle deleted state?
send_update(file_id, false);
}
void on_download_ok(FileId file_id) final {
on_progress(file_id);
send_update(file_id, true);
}
void on_download_error(FileId file_id, Status error) {
on_progress(file_id);
send_update(file_id, true);
}
private:
ActorId<DownloadManager> download_manager_;
};
download_file_callback_ = std::make_shared<Impl>(download_manager_);
ActorShared<DownloadManager> download_manager_;
void send_update(FileId file_id, bool is_paused) {
auto td = G()->td().get_actor_unsafe();
auto file_view = td->file_manager_->get_file_view(file_id);
send_closure(download_manager_, &DownloadManager::update_file_download_state, file_id,
file_view.local_total_size(), file_view.size(), is_paused);
// TODO: handle deleted state?
}
return download_file_callback_;
};
return std::make_shared<Impl>(ActorShared<DownloadManager>(download_manager_, link_token));
}
};
send_closure_later(download_manager_actor_, &DownloadManager::set_callback,

View File

@ -3915,5 +3915,8 @@ void FileManager::hangup() {
void FileManager::tear_down() {
parent_.reset();
}
constexpr int64 FileManager::KEEP_DOWNLOAD_LIMIT;
constexpr int64 FileManager::KEEP_DOWNLOAD_OFFSET;
constexpr int64 FileManager::IGNORE_DOWNLOAD_LIMIT;
} // namespace td