Inline FileLoader to FileUploader.

This commit is contained in:
levlam 2024-07-13 00:59:24 +03:00
parent 8fee0251f9
commit bb0eb35027
4 changed files with 276 additions and 99 deletions

View File

@ -11,7 +11,6 @@
#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/UniqueId.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
@ -52,23 +51,6 @@ void FileLoader::hangup_shared() {
}
}
void FileLoader::update_local_file_location(const LocalFileLocation &local) {
auto r_prefix_info = on_update_local_location(local, parts_manager_.get_size_or_zero());
if (r_prefix_info.is_error()) {
on_error(r_prefix_info.move_as_error());
stop_flag_ = true;
return;
}
auto prefix_info = r_prefix_info.move_as_ok();
auto status = parts_manager_.set_known_prefix(prefix_info.size, prefix_info.is_ready);
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
return;
}
loop();
}
void FileLoader::update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) {
if (parts_manager_.get_streaming_offset() != offset) {
auto begin_part_id = parts_manager_.set_streaming_offset(offset, limit);

View File

@ -31,17 +31,14 @@ class FileLoader : public FileLoaderActor {
void update_priority(int8 priority) final;
void update_resources(const ResourceState &other) final;
void update_local_file_location(const LocalFileLocation &local) final;
void update_local_file_location(const LocalFileLocation &local) final {
}
void update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) final;
protected:
void set_ordered_flag(bool flag);
size_t get_part_size() const;
struct PrefixInfo {
int64 size = -1;
bool is_ready = false;
};
struct FileInfo {
int64 size{0};
int64 expected_size{0};
@ -75,10 +72,7 @@ class FileLoader : public FileLoaderActor {
int64 size{0};
};
virtual void on_progress(Progress progress) = 0;
virtual Result<PrefixInfo> on_update_local_location(const LocalFileLocation &location,
int64 file_size) TD_WARN_UNUSED_RESULT {
return Status::Error("Unsupported");
}
virtual Result<bool> should_restart_part(Part part, const NetQueryPtr &net_query) TD_WARN_UNUSED_RESULT {
return false;
}

View File

@ -12,6 +12,7 @@
#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/SecureStorage.h"
#include "td/telegram/telegram_api.h"
#include "td/telegram/UniqueId.h"
#include "td/utils/buffer.h"
#include "td/utils/crypto.h"
@ -43,15 +44,21 @@ FileUploader::FileUploader(const LocalFileLocation &local, const RemoteFileLocat
}
}
Result<FileLoader::FileInfo> FileUploader::init() {
void FileUploader::start_up() {
if (remote_.type() == RemoteFileLocation::Type::Full) {
return Status::Error("File is already uploaded");
on_error(Status::Error("File is already uploaded"));
stop_flag_ = true;
return;
}
// file_size is needed only for partial local locations, but for uploaded partial files
// size is yet unknown or local location is full, so we can always pass 0 here
TRY_RESULT(prefix_info, on_update_local_location(local_, 0));
(void)prefix_info;
auto r_prefix_info = on_update_local_location(local_, 0);
if (r_prefix_info.is_error()) {
on_error(r_prefix_info.move_as_error());
stop_flag_ = true;
return;
}
int offset = 0;
int part_size = 0;
@ -66,20 +73,20 @@ Result<FileLoader::FileInfo> FileUploader::init() {
big_flag_ = is_file_big(file_type_, expected_size_);
}
std::vector<bool> ok(offset, true);
vector<bool> ok(offset, true);
for (auto bad_id : bad_parts_) {
if (bad_id >= 0 && bad_id < offset) {
ok[bad_id] = false;
}
}
std::vector<int> parts;
vector<int> ready_parts;
for (int i = 0; i < offset; i++) {
if (ok[i]) {
parts.push_back(i);
ready_parts.push_back(i);
}
}
if (!ok.empty() && !ok[0]) {
parts.clear();
ready_parts.clear();
part_size = 0;
remote_ = RemoteFileLocation();
file_id_ = Random::secure_int64();
@ -87,18 +94,36 @@ Result<FileLoader::FileInfo> FileUploader::init() {
}
LOG(DEBUG) << "Init file uploader for " << remote_ << " with offset = " << offset << " and part size = " << part_size;
FileInfo res;
res.size = local_size_;
res.expected_size = expected_size_;
res.is_size_final = local_is_ready_;
res.part_size = part_size;
res.ready_parts = std::move(parts);
res.is_upload = true;
return res;
auto expected_size = max(local_size_, expected_size_);
// Two cases when FILE_UPLOAD_RESTART will happen
// 1. File is ready, size is final. But there are more uploaded parts than size of the file
// pm.init(1, 100000, true, 10, {0, 1, 2}, false, true).ensure_error();
// This error is definitely ok, because we are using actual size of the file on disk (mtime is checked by
// somebody else). And actual size could change arbitrarily.
//
// 2. File size is not final, and some parts ending after known file size were uploaded
// pm.init(0, 100000, false, 10, {0, 1, 2}, false, true).ensure_error();
// This can happen only if file state became inconsistent at some point. For example, local location was deleted,
// but partial remote location was kept. This is possible, but probably should be fixed.
auto status = parts_manager_.init(local_size_, expected_size, local_is_ready_, part_size, ready_parts, true, true);
LOG(DEBUG) << "Start uploading a file of size " << local_size_ << " with expected "
<< (local_is_ready_ ? "exact" : "approximate") << " size " << expected_size << ", part size " << part_size
<< " and " << ready_parts.size() << " ready parts: " << status;
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
return;
}
resource_state_.set_unit_size(parts_manager_.get_part_size());
update_estimated_limit();
on_progress();
yield();
}
Result<FileLoader::PrefixInfo> FileUploader::on_update_local_location(const LocalFileLocation &location,
int64 file_size) {
Result<FileUploader::PrefixInfo> FileUploader::on_update_local_location(const LocalFileLocation &location,
int64 file_size) {
SCOPE_EXIT {
try_release_fd();
};
@ -195,15 +220,6 @@ Result<FileLoader::PrefixInfo> FileUploader::on_update_local_location(const Loca
return info;
}
Status FileUploader::on_ok(int64 size) {
fd_.close();
if (is_temp_) {
LOG(INFO) << "UNLINK " << fd_path_;
unlink(fd_path_).ignore();
}
return Status::OK();
}
void FileUploader::on_error(Status status) {
fd_.close();
if (is_temp_) {
@ -215,7 +231,7 @@ void FileUploader::on_error(Status status) {
Status FileUploader::generate_iv_map() {
LOG(INFO) << "Generate iv_map " << generate_offset_ << " " << local_size_;
auto part_size = get_part_size();
auto part_size = parts_manager_.get_part_size();
auto encryption_key = FileEncryptionKey(encryption_key_.key_slice(), generate_iv_);
BufferSlice bytes(part_size);
if (iv_map_.empty()) {
@ -236,19 +252,7 @@ Status FileUploader::generate_iv_map() {
return Status::OK();
}
Status FileUploader::before_start_parts() {
auto status = acquire_fd();
if (status.is_error() && !local_is_ready_) {
return Status::Error(-1, "Can't open temporary file");
}
return status;
}
void FileUploader::after_start_parts() {
try_release_fd();
}
Result<NetQueryPtr> FileUploader::start_part(Part part, int32 part_count, int64 streaming_offset) {
Result<NetQueryPtr> FileUploader::start_part(Part part, int32 part_count) {
auto padded_size = part.size;
if (encryption_key_.is_secret()) {
padded_size = (padded_size + 15) & ~15;
@ -307,23 +311,19 @@ Result<size_t> FileUploader::process_part(Part part, NetQueryPtr net_query) {
return part.size;
}
void FileUploader::on_progress(Progress progress) {
callback_->on_partial_upload(PartialRemoteFileLocation{file_id_, progress.part_count, progress.part_size,
progress.ready_part_count, big_flag_},
progress.ready_size);
if (progress.is_ready) {
void FileUploader::on_progress() {
auto part_count = parts_manager_.get_part_count();
auto part_size = static_cast<int32>(parts_manager_.get_part_size());
auto ready_part_count = parts_manager_.get_ready_prefix_count();
callback_->on_partial_upload(PartialRemoteFileLocation{file_id_, part_count, part_size, ready_part_count, big_flag_},
parts_manager_.get_ready_size());
if (parts_manager_.ready()) {
callback_->on_ok(file_type_,
PartialRemoteFileLocation{file_id_, progress.part_count, progress.part_size,
progress.ready_part_count, big_flag_},
PartialRemoteFileLocation{file_id_, part_count, part_size, ready_part_count, big_flag_},
local_size_);
}
}
void FileUploader::keep_fd_flag(bool keep_fd) {
keep_fd_ = keep_fd;
try_release_fd();
}
void FileUploader::try_release_fd() {
if (!keep_fd_ && !fd_.empty()) {
fd_.close();
@ -337,4 +337,164 @@ Status FileUploader::acquire_fd() {
return Status::OK();
}
void FileUploader::set_resource_manager(ActorShared<ResourceManager> resource_manager) {
resource_manager_ = std::move(resource_manager);
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
}
void FileUploader::update_priority(int8 priority) {
send_closure(resource_manager_, &ResourceManager::update_priority, priority);
}
void FileUploader::update_resources(const ResourceState &other) {
resource_state_.update_slave(other);
VLOG(file_loader) << "Update resources " << resource_state_;
loop();
}
void FileUploader::update_local_file_location(const LocalFileLocation &local) {
auto r_prefix_info = on_update_local_location(local, parts_manager_.get_size_or_zero());
if (r_prefix_info.is_error()) {
on_error(r_prefix_info.move_as_error());
stop_flag_ = true;
return;
}
auto prefix_info = r_prefix_info.move_as_ok();
auto status = parts_manager_.set_known_prefix(prefix_info.size, prefix_info.is_ready);
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
return;
}
loop();
}
void FileUploader::loop() {
if (stop_flag_) {
return;
}
auto status = do_loop();
if (status.is_error()) {
if (status.code() == -1) {
return;
}
on_error(std::move(status));
stop_flag_ = true;
return;
}
}
Status FileUploader::do_loop() {
if (parts_manager_.may_finish()) {
TRY_STATUS(parts_manager_.finish());
fd_.close();
if (is_temp_) {
LOG(INFO) << "UNLINK " << fd_path_;
unlink(fd_path_).ignore();
}
stop_flag_ = true;
return Status::OK();
}
auto status = acquire_fd();
if (status.is_error()) {
if (!local_is_ready_) {
return Status::Error(-1, "Can't open temporary file");
}
return status;
}
SCOPE_EXIT {
try_release_fd();
};
while (true) {
if (resource_state_.unused() < narrow_cast<int64>(parts_manager_.get_part_size())) {
VLOG(file_loader) << "Receive only " << resource_state_.unused() << " resource";
break;
}
TRY_RESULT(part, parts_manager_.start_part());
if (part.size == 0) {
break;
}
VLOG(file_loader) << "Start part " << tag("id", part.id) << tag("size", part.size);
resource_state_.start_use(static_cast<int64>(part.size));
TRY_RESULT(query, start_part(part, parts_manager_.get_part_count()));
uint64 unique_id = UniqueId::next();
part_map_[unique_id] = std::make_pair(part, query->cancel_slot_.get_signal_new());
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, unique_id));
}
return Status::OK();
}
void FileUploader::tear_down() {
for (auto &it : part_map_) {
it.second.second.reset(); // cancel_query(it.second.second);
}
}
void FileUploader::update_estimated_limit() {
if (stop_flag_) {
return;
}
auto estimated_extra = parts_manager_.get_estimated_extra();
resource_state_.update_estimated_limit(estimated_extra);
VLOG(file_loader) << "Update estimated limit " << estimated_extra;
if (!resource_manager_.empty()) {
keep_fd_ = narrow_cast<uint64>(resource_state_.active_limit()) >= parts_manager_.get_part_size();
try_release_fd();
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
}
}
void FileUploader::on_result(NetQueryPtr query) {
if (stop_flag_) {
return;
}
auto unique_id = get_link_token();
auto it = part_map_.find(unique_id);
if (it == part_map_.end()) {
LOG(ERROR) << "Receive result for unknown part";
return;
}
Part part = it->second.first;
it->second.second.release();
CHECK(query->is_ready());
part_map_.erase(it);
bool should_restart = query->is_error() && query->error().code() == NetQuery::Error::Canceled;
if (should_restart) {
VLOG(file_loader) << "Restart part " << tag("id", part.id) << tag("size", part.size);
resource_state_.stop_use(static_cast<int64>(part.size));
parts_manager_.on_part_failed(part.id);
} else {
on_part_query(part, std::move(query));
}
update_estimated_limit();
loop();
}
void FileUploader::on_part_query(Part part, NetQueryPtr query) {
if (stop_flag_) {
// important for secret files
return;
}
auto status = try_on_part_query(part, std::move(query));
if (status.is_error()) {
on_error(std::move(status));
stop_flag_ = true;
}
}
Status FileUploader::try_on_part_query(Part part, NetQueryPtr query) {
TRY_RESULT(size, process_part(part, std::move(query)));
VLOG(file_loader) << "Ok part " << tag("id", part.id) << tag("size", part.size);
resource_state_.stop_use(static_cast<int64>(part.size));
TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size));
on_progress();
return Status::OK();
}
} // namespace td

View File

@ -7,20 +7,25 @@
#pragma once
#include "td/telegram/files/FileEncryptionKey.h"
#include "td/telegram/files/FileLoader.h"
#include "td/telegram/files/FileLoaderActor.h"
#include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/FileType.h"
#include "td/telegram/files/PartsManager.h"
#include "td/telegram/files/ResourceManager.h"
#include "td/telegram/files/ResourceState.h"
#include "td/telegram/net/NetQuery.h"
#include "td/utils/common.h"
#include "td/utils/port/FileFd.h"
#include "td/utils/Status.h"
#include "td/utils/UInt.h"
#include <map>
#include <utility>
namespace td {
class FileUploader final : public FileLoader {
class FileUploader final : public FileLoaderActor {
public:
class Callback {
public:
@ -37,20 +42,29 @@ class FileUploader final : public FileLoader {
FileUploader(const LocalFileLocation &local, const RemoteFileLocation &remote, int64 expected_size,
const FileEncryptionKey &encryption_key, std::vector<int> bad_parts, unique_ptr<Callback> callback);
// Should just implement all parent pure virtual methods.
// Must not call any of them...
void set_resource_manager(ActorShared<ResourceManager> resource_manager) final;
void update_priority(int8 priority) final;
void update_resources(const ResourceState &other) final;
void update_local_file_location(const LocalFileLocation &local) final;
void update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) {
}
private:
LocalFileLocation local_;
RemoteFileLocation remote_;
int64 expected_size_;
FileEncryptionKey encryption_key_;
std::vector<int> bad_parts_;
vector<int> bad_parts_;
unique_ptr<Callback> callback_;
int64 local_size_ = 0;
bool local_is_ready_ = false;
FileType file_type_ = FileType::Temp;
std::vector<UInt256> iv_map_;
vector<UInt256> iv_map_;
UInt256 iv_;
string generate_iv_;
int64 generate_offset_ = 0;
@ -58,27 +72,54 @@ class FileUploader final : public FileLoader {
FileFd fd_;
string fd_path_;
bool is_temp_ = false;
int64 file_id_ = 0;
bool is_temp_ = false;
bool big_flag_ = false;
bool keep_fd_ = false;
bool stop_flag_ = false;
Result<FileInfo> init() final TD_WARN_UNUSED_RESULT;
Status on_ok(int64 size) final TD_WARN_UNUSED_RESULT;
void on_error(Status status) final;
Status before_start_parts() final;
void after_start_parts() final;
Result<NetQueryPtr> start_part(Part part, int32 part_count, int64 streaming_offset) final TD_WARN_UNUSED_RESULT;
Result<size_t> process_part(Part part, NetQueryPtr net_query) final TD_WARN_UNUSED_RESULT;
void on_progress(Progress progress) final;
Result<PrefixInfo> on_update_local_location(const LocalFileLocation &location,
int64 file_size) final TD_WARN_UNUSED_RESULT;
ActorShared<ResourceManager> resource_manager_;
ResourceState resource_state_;
PartsManager parts_manager_;
std::map<uint64, std::pair<Part, ActorShared<>>> part_map_;
void on_error(Status status);
Result<NetQueryPtr> start_part(Part part, int32 part_count) TD_WARN_UNUSED_RESULT;
Result<size_t> process_part(Part part, NetQueryPtr net_query) TD_WARN_UNUSED_RESULT;
void on_progress();
struct PrefixInfo {
int64 size = -1;
bool is_ready = false;
};
Result<PrefixInfo> on_update_local_location(const LocalFileLocation &location, int64 file_size) TD_WARN_UNUSED_RESULT;
Status generate_iv_map();
bool keep_fd_ = false;
void keep_fd_flag(bool keep_fd) final;
void try_release_fd();
Status acquire_fd() TD_WARN_UNUSED_RESULT;
void start_up() final;
void loop() final;
Status do_loop();
void tear_down() final;
void update_estimated_limit();
void on_result(NetQueryPtr query) final;
void on_part_query(Part part, NetQueryPtr query);
void on_common_query(NetQueryPtr query);
Status try_on_part_query(Part part, NetQueryPtr query);
};
} // namespace td