FileManager: force upload, and separate full and partial remote location storage
GitOrigin-RevId: 3bdc218738558012ee6e2a790a553b2f0fea1d62
This commit is contained in:
parent
06f9533a5a
commit
82589eeb2f
@ -43,6 +43,30 @@ namespace {
|
||||
constexpr int64 MAX_FILE_SIZE = 1500 * (1 << 20) /* 1500MB */;
|
||||
} // namespace
|
||||
|
||||
NewRemoteFileLocation::NewRemoteFileLocation(RemoteFileLocation remote, FileLocationSource source) {
|
||||
switch (remote.type()) {
|
||||
case RemoteFileLocation::Type::Empty: {
|
||||
return;
|
||||
}
|
||||
case RemoteFileLocation::Type::Partial: {
|
||||
partial = make_unique<PartialRemoteFileLocation>(remote.partial());
|
||||
return;
|
||||
}
|
||||
case RemoteFileLocation::Type::Full: {
|
||||
full = remote.full();
|
||||
full_source = source;
|
||||
is_full_alive = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RemoteFileLocation NewRemoteFileLocation::partial_or_empty() const {
|
||||
if (partial) {
|
||||
return RemoteFileLocation(*partial);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
int VERBOSITY_NAME(update_file) = VERBOSITY_NAME(DEBUG);
|
||||
|
||||
FileNode *FileNodePtr::operator->() const {
|
||||
@ -138,40 +162,67 @@ void FileNode::set_local_location(const LocalFileLocation &local, int64 ready_si
|
||||
}
|
||||
}
|
||||
|
||||
void FileNode::set_remote_location(const RemoteFileLocation &remote, FileLocationSource source, int64 ready_size) {
|
||||
if (remote_ready_size_ != ready_size) {
|
||||
VLOG(update_file) << "File " << main_file_id_ << " has changed remote ready size from " << remote_ready_size_
|
||||
<< " to " << ready_size;
|
||||
remote_ready_size_ = ready_size;
|
||||
on_info_changed();
|
||||
}
|
||||
if (remote_ == remote) {
|
||||
if (remote_.type() == RemoteFileLocation::Type::Full) {
|
||||
if (remote_.full().get_access_hash() != remote.full().get_access_hash() ||
|
||||
remote_.full().get_raw_file_reference() != remote.full().get_raw_file_reference()) {
|
||||
remote_ = remote;
|
||||
remote_source_ = source;
|
||||
void FileNode::set_new_remote_location(NewRemoteFileLocation new_remote) {
|
||||
if (new_remote.full) {
|
||||
if (remote_.full && remote_.full.value() == new_remote.full.value()) {
|
||||
if (remote_.full.value().get_access_hash() == new_remote.full.value().get_access_hash() &&
|
||||
remote_.full.value().get_raw_file_reference() == new_remote.full.value().get_raw_file_reference()) {
|
||||
} else {
|
||||
on_pmc_changed();
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
VLOG(update_file) << "File " << main_file_id_ << " has changed remote location";
|
||||
on_changed();
|
||||
}
|
||||
remote_.full = new_remote.full;
|
||||
remote_.is_full_alive = new_remote.is_full_alive;
|
||||
} else {
|
||||
if (remote_.full) {
|
||||
remote_.full = {};
|
||||
remote_.is_full_alive = false;
|
||||
on_changed();
|
||||
}
|
||||
}
|
||||
|
||||
VLOG(update_file) << "File " << main_file_id_ << " has changed remote location";
|
||||
remote_ = remote;
|
||||
remote_source_ = source;
|
||||
if (new_remote.partial) {
|
||||
set_partial_remote_location(*new_remote.partial, new_remote.ready_size);
|
||||
} else {
|
||||
delete_partial_remote_location();
|
||||
}
|
||||
}
|
||||
void FileNode::delete_partial_remote_location() {
|
||||
if (remote_.partial) {
|
||||
remote_.partial.reset();
|
||||
on_changed();
|
||||
}
|
||||
}
|
||||
void FileNode::set_partial_remote_location(const PartialRemoteFileLocation &remote, int64 ready_size) {
|
||||
if (remote_.is_full_alive) {
|
||||
return;
|
||||
}
|
||||
if (remote_.ready_size != ready_size) {
|
||||
VLOG(update_file) << "File " << main_file_id_ << " has changed remote ready size from " << remote_.ready_size
|
||||
<< " to " << ready_size;
|
||||
remote_.ready_size = ready_size;
|
||||
on_info_changed();
|
||||
}
|
||||
if (remote_.partial && *remote_.partial == remote) {
|
||||
return;
|
||||
}
|
||||
remote_.partial = make_unique<PartialRemoteFileLocation>(remote);
|
||||
on_changed();
|
||||
return;
|
||||
}
|
||||
|
||||
bool FileNode::delete_file_reference(Slice file_reference) {
|
||||
if (remote_.type() != RemoteFileLocation::Type::Full) {
|
||||
if (!remote_.full) {
|
||||
VLOG(file_references) << "Can't delete file reference, because there is no remote location";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!remote_.full().delete_file_reference(file_reference)) {
|
||||
if (!remote_.full.value().delete_file_reference(file_reference)) {
|
||||
VLOG(file_references) << "Can't delete unmatching file reference " << format::escaped(file_reference) << ", have "
|
||||
<< format::escaped(remote_.full().get_raw_file_reference());
|
||||
<< format::escaped(remote_.full.value().get_raw_file_reference());
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -251,7 +302,7 @@ void FileNode::set_download_priority(int8 priority) {
|
||||
}
|
||||
|
||||
void FileNode::set_upload_priority(int8 priority) {
|
||||
if (remote_.type() != RemoteFileLocation::Type::Full && (upload_priority_ == 0) != (priority == 0)) {
|
||||
if (!remote_.is_full_alive && (upload_priority_ == 0) != (priority == 0)) {
|
||||
VLOG(update_file) << "File " << main_file_id_ << " has changed upload priority to " << priority;
|
||||
on_info_changed();
|
||||
}
|
||||
@ -307,13 +358,12 @@ bool FileNode::need_pmc_flush() const {
|
||||
has_generate_location = false;
|
||||
}
|
||||
|
||||
if (remote_.type() == RemoteFileLocation::Type::Full/* &&
|
||||
if (remote_.full/* &&
|
||||
(has_generate_location || local_.type() != LocalFileLocation::Type::Empty)*/) {
|
||||
// we need to always save file sources
|
||||
return true;
|
||||
}
|
||||
if (local_.type() == LocalFileLocation::Type::Full &&
|
||||
(has_generate_location || remote_.type() != RemoteFileLocation::Type::Empty)) {
|
||||
if (local_.type() == LocalFileLocation::Type::Full && (has_generate_location || remote_.full || remote_.partial)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -358,11 +408,14 @@ const FullLocalFileLocation &FileView::local_location() const {
|
||||
}
|
||||
|
||||
bool FileView::has_remote_location() const {
|
||||
return node_->remote_.type() == RemoteFileLocation::Type::Full;
|
||||
return bool(node_->remote_.full);
|
||||
}
|
||||
bool FileView::has_alive_remote_location() const {
|
||||
return node_->remote_.is_full_alive;
|
||||
}
|
||||
|
||||
bool FileView::has_active_upload_remote_location() const {
|
||||
if (!has_remote_location()) {
|
||||
if (!has_alive_remote_location()) {
|
||||
return false;
|
||||
}
|
||||
if (remote_location().is_encrypted_any()) {
|
||||
@ -387,7 +440,7 @@ const FullRemoteFileLocation &FileView::remote_location() const {
|
||||
if (remote) {
|
||||
return *remote;
|
||||
}
|
||||
return node_->remote_.full();
|
||||
return node_->remote_.full.value();
|
||||
}
|
||||
|
||||
bool FileView::has_generate_location() const {
|
||||
@ -483,24 +536,22 @@ bool FileView::is_uploading() const {
|
||||
}
|
||||
|
||||
int64 FileView::remote_size() const {
|
||||
switch (node_->remote_.type()) {
|
||||
case RemoteFileLocation::Type::Full:
|
||||
return node_->size_;
|
||||
case RemoteFileLocation::Type::Partial: {
|
||||
auto part_size = static_cast<int64>(node_->remote_.partial().part_size_);
|
||||
auto ready_part_count = node_->remote_.partial().ready_part_count_;
|
||||
auto remote_ready_size = node_->remote_ready_size_;
|
||||
VLOG(update_file) << "Have part_size = " << part_size << ", remote_ready_part_count = " << ready_part_count
|
||||
<< ", remote_ready_size = " << remote_ready_size << ", size = " << size();
|
||||
auto res = max(part_size * ready_part_count, remote_ready_size);
|
||||
if (size() != 0 && size() < res) {
|
||||
res = size();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
default:
|
||||
return node_->remote_ready_size_;
|
||||
if (node_->remote_.is_full_alive) {
|
||||
return node_->size_;
|
||||
}
|
||||
if (node_->remote_.partial) {
|
||||
auto part_size = static_cast<int64>(node_->remote_.partial->part_size_);
|
||||
auto ready_part_count = node_->remote_.partial->ready_part_count_;
|
||||
auto remote_ready_size = node_->remote_.ready_size;
|
||||
VLOG(update_file) << "Have part_size = " << part_size << ", remote_ready_part_count = " << ready_part_count
|
||||
<< ", remote_ready_size = " << remote_ready_size << ", size = " << size();
|
||||
auto res = max(part_size * ready_part_count, remote_ready_size);
|
||||
if (size() != 0 && size() < res) {
|
||||
res = size();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
return node_->remote_.ready_size; //???
|
||||
}
|
||||
|
||||
string FileView::path() const {
|
||||
@ -573,8 +624,8 @@ bool FileView::can_delete() const {
|
||||
}
|
||||
|
||||
/*** FileManager ***/
|
||||
static int merge_choose_remote_location(const FullRemoteFileLocation &x, int8 x_source, const FullRemoteFileLocation &y,
|
||||
int8 y_source);
|
||||
static int merge_choose_remote_location(const FullRemoteFileLocation &x, FileLocationSource x_source,
|
||||
const FullRemoteFileLocation &y, FileLocationSource y_source);
|
||||
|
||||
namespace {
|
||||
void prepare_path_for_pmc(FileType file_type, string &path) {
|
||||
@ -916,11 +967,10 @@ Result<FileId> FileManager::register_file(FileData &&data, FileLocationSource fi
|
||||
// create FileNode
|
||||
auto file_node_id = next_file_node_id();
|
||||
auto &node = file_nodes_[file_node_id];
|
||||
node = td::make_unique<FileNode>(std::move(data.local_), std::move(data.remote_), std::move(data.generate_),
|
||||
data.size_, data.expected_size_, std::move(data.remote_name_), std::move(data.url_),
|
||||
data.owner_dialog_id_, std::move(data.encryption_key_), file_id,
|
||||
static_cast<int8>(has_remote));
|
||||
node->remote_source_ = file_location_source;
|
||||
node = td::make_unique<FileNode>(std::move(data.local_), NewRemoteFileLocation(data.remote_, file_location_source),
|
||||
std::move(data.generate_), data.size_, data.expected_size_,
|
||||
std::move(data.remote_name_), std::move(data.url_), data.owner_dialog_id_,
|
||||
std::move(data.encryption_key_), file_id, static_cast<int8>(has_remote));
|
||||
node->pmc_id_ = FileDbId(data.pmc_id_);
|
||||
get_file_id_info(file_id)->node_id_ = file_node_id;
|
||||
node->file_ids_.push_back(file_id);
|
||||
@ -950,9 +1000,8 @@ Result<FileId> FileManager::register_file(FileData &&data, FileLocationSource fi
|
||||
new_remote = true;
|
||||
} else {
|
||||
to_merge.push_back(stored_info.file_id_);
|
||||
if (merge_choose_remote_location(file_view.remote_location(), static_cast<uint8>(file_location_source),
|
||||
stored_info.remote_,
|
||||
static_cast<uint8>(stored_info.file_location_source_)) == 0) {
|
||||
if (merge_choose_remote_location(file_view.remote_location(), file_location_source, stored_info.remote_,
|
||||
stored_info.file_location_source_) == 0) {
|
||||
stored_info.remote_ = file_view.remote_location();
|
||||
stored_info.file_location_source_ = file_location_source;
|
||||
}
|
||||
@ -1003,8 +1052,12 @@ static int merge_choose_local_location(const LocalFileLocation &x, const LocalFi
|
||||
return 2;
|
||||
}
|
||||
|
||||
static int merge_choose_remote_location(const FullRemoteFileLocation &x, int8 x_source, const FullRemoteFileLocation &y,
|
||||
int8 y_source) {
|
||||
static int merge_choose_file_source_location(FileLocationSource x, FileLocationSource y) {
|
||||
return static_cast<int>(x) < static_cast<int>(y);
|
||||
}
|
||||
|
||||
static int merge_choose_remote_location(const FullRemoteFileLocation &x, FileLocationSource x_source,
|
||||
const FullRemoteFileLocation &y, FileLocationSource y_source) {
|
||||
if (x.is_web() != y.is_web()) {
|
||||
return x.is_web(); // prefer non-web
|
||||
}
|
||||
@ -1015,32 +1068,23 @@ static int merge_choose_remote_location(const FullRemoteFileLocation &x, int8 x_
|
||||
return !x_ref;
|
||||
}
|
||||
if (x.get_raw_file_reference() != y.get_raw_file_reference()) {
|
||||
if (x_source != y_source) {
|
||||
return x_source < y_source;
|
||||
}
|
||||
// prefer newest among two server locations
|
||||
return 0;
|
||||
return merge_choose_file_source_location(x_source, y_source);
|
||||
}
|
||||
}
|
||||
if (x.get_access_hash() != y.get_access_hash()) {
|
||||
if (x_source != y_source) {
|
||||
return x_source < y_source;
|
||||
}
|
||||
// prefer newest among two server locations
|
||||
return 0;
|
||||
return merge_choose_file_source_location(x_source, y_source);
|
||||
}
|
||||
return 2;
|
||||
}
|
||||
static int merge_choose_remote_location(const RemoteFileLocation &x, int8 x_source, const RemoteFileLocation &y,
|
||||
int8 y_source) {
|
||||
int32 x_type = static_cast<int32>(x.type());
|
||||
int32 y_type = static_cast<int32>(y.type());
|
||||
if (x_type != y_type) {
|
||||
return x_type < y_type;
|
||||
static int merge_choose_remote_location(const NewRemoteFileLocation &x, const NewRemoteFileLocation &y) {
|
||||
if (x.is_full_alive != y.is_full_alive) {
|
||||
return !x.is_full_alive;
|
||||
}
|
||||
// If access_hash changed use a newer one
|
||||
if (x.type() == RemoteFileLocation::Type::Full) {
|
||||
return merge_choose_remote_location(x.full(), x_source, y.full(), y_source);
|
||||
if (x.is_full_alive) {
|
||||
return merge_choose_remote_location(x.full.value(), x.full_source, y.full.value(), y.full_source);
|
||||
}
|
||||
if (!x.partial != !y.partial) {
|
||||
return !x.partial;
|
||||
}
|
||||
return 2;
|
||||
}
|
||||
@ -1180,21 +1224,19 @@ Result<FileId> FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy
|
||||
y_node->set_upload_pause(FileId());
|
||||
}
|
||||
|
||||
if (x_node->remote_.type() == RemoteFileLocation::Type::Full &&
|
||||
y_node->remote_.type() == RemoteFileLocation::Type::Full && !x_node->remote_.full().is_web() &&
|
||||
!y_node->remote_.full().is_web() && x_node->remote_.full().get_dc_id() != y_node->remote_.full().get_dc_id()) {
|
||||
LOG(ERROR) << "File remote location was changed from " << y_node->remote_.full() << " to "
|
||||
<< x_node->remote_.full();
|
||||
if (x_node->remote_.full && y_node->remote_.full && !x_node->remote_.full.value().is_web() &&
|
||||
!y_node->remote_.full.value().is_web() &&
|
||||
x_node->remote_.full.value().get_dc_id() != y_node->remote_.full.value().get_dc_id()) {
|
||||
LOG(ERROR) << "File remote location was changed from " << y_node->remote_.full.value() << " to "
|
||||
<< x_node->remote_.full.value();
|
||||
}
|
||||
|
||||
FileNodePtr nodes[] = {x_node, y_node, x_node};
|
||||
FileNodeId node_ids[] = {get_file_id_info(x_file_id)->node_id_, get_file_id_info(y_file_id)->node_id_};
|
||||
int trusted_by_source =
|
||||
static_cast<int>(static_cast<int8>(x_node->remote_source_) < static_cast<int8>(y_node->remote_source_));
|
||||
int trusted_by_source = merge_choose_file_source_location(x_node->remote_.full_source, y_node->remote_.full_source);
|
||||
|
||||
int local_i = merge_choose_local_location(x_node->local_, y_node->local_);
|
||||
int remote_i = merge_choose_remote_location(x_node->remote_, static_cast<int8>(x_node->remote_source_),
|
||||
y_node->remote_, static_cast<int8>(y_node->remote_source_));
|
||||
int remote_i = merge_choose_remote_location(x_node->remote_, y_node->remote_);
|
||||
int generate_i = merge_choose_generate_location(x_node->generate_, y_node->generate_);
|
||||
int size_i = merge_choose_size(x_node->size_, y_node->size_);
|
||||
int expected_size_i = merge_choose_expected_size(x_node->expected_size_, y_node->expected_size_);
|
||||
@ -1210,8 +1252,7 @@ Result<FileId> FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy
|
||||
<< y_node->size_);
|
||||
}
|
||||
if (encryption_key_i == -1) {
|
||||
if (nodes[remote_i]->remote_.type() == RemoteFileLocation::Type::Full &&
|
||||
nodes[local_i]->local_.type() != LocalFileLocation::Type::Partial) {
|
||||
if (nodes[remote_i]->remote_.full && nodes[local_i]->local_.type() != LocalFileLocation::Type::Partial) {
|
||||
//???
|
||||
LOG(ERROR) << "Different encryption key in files, but go Choose same key as remote location";
|
||||
encryption_key_i = remote_i;
|
||||
@ -1278,7 +1319,7 @@ Result<FileId> FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy
|
||||
|
||||
if (remote_i == other_node_i) {
|
||||
do_cancel_upload(node);
|
||||
node->set_remote_location(other_node->remote_, other_node->remote_source_, other_node->remote_ready_size_);
|
||||
node->set_new_remote_location(std::move(other_node->remote_));
|
||||
node->upload_id_ = other_node->upload_id_;
|
||||
node->upload_was_update_file_reference_ = other_node->upload_was_update_file_reference_;
|
||||
node->set_upload_priority(other_node->upload_priority_);
|
||||
@ -1510,7 +1551,7 @@ void FileManager::clear_from_pmc(FileNodePtr node) {
|
||||
data.local_ = node->local_;
|
||||
}
|
||||
if (file_view.has_remote_location()) {
|
||||
data.remote_ = node->remote_;
|
||||
data.remote_ = RemoteFileLocation(*node->remote_.full);
|
||||
}
|
||||
if (file_view.has_generate_location()) {
|
||||
data.generate_ = make_unique<FullGenerateFileLocation>(*node->generate_);
|
||||
@ -1537,7 +1578,11 @@ void FileManager::flush_to_pmc(FileNodePtr node, bool new_remote, bool new_local
|
||||
if (data.local_.type() == LocalFileLocation::Type::Full) {
|
||||
prepare_path_for_pmc(data.local_.full().file_type_, data.local_.full().path_);
|
||||
}
|
||||
data.remote_ = node->remote_;
|
||||
if (node->remote_.is_full_alive) {
|
||||
data.remote_ = RemoteFileLocation(node->remote_.full.value());
|
||||
} else if (node->remote_.partial) {
|
||||
data.remote_ = RemoteFileLocation(*node->remote_.partial);
|
||||
}
|
||||
if (node->generate_ != nullptr && !begins_with(node->generate_->conversion_, "#file_id#")) {
|
||||
data.generate_ = make_unique<FullGenerateFileLocation>(*node->generate_);
|
||||
}
|
||||
@ -1682,8 +1727,8 @@ bool FileManager::set_content(FileId file_id, BufferSlice bytes) {
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::SetContent});
|
||||
node->download_id_ = id;
|
||||
node->is_download_started_ = true;
|
||||
send_closure(file_load_manager_, &FileLoadManager::from_bytes, id, node->remote_.full().file_type_, std::move(bytes),
|
||||
node->suggested_name());
|
||||
send_closure(file_load_manager_, &FileLoadManager::from_bytes, id, node->remote_.full.value().file_type_,
|
||||
std::move(bytes), node->suggested_name());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1873,15 +1918,137 @@ void FileManager::run_download(FileNodePtr node) {
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Download});
|
||||
node->download_id_ = id;
|
||||
node->is_download_started_ = false;
|
||||
LOG(DEBUG) << "Run download of file " << file_id << " of size " << node->size_ << " from " << node->remote_.full()
|
||||
<< " with suggested name " << node->suggested_name() << " and encyption key " << node->encryption_key_;
|
||||
LOG(DEBUG) << "Run download of file " << file_id << " of size " << node->size_ << " from "
|
||||
<< node->remote_.full.value() << " with suggested name " << node->suggested_name() << " and encyption key "
|
||||
<< node->encryption_key_;
|
||||
auto download_offset = file_view.is_encrypted_any() ? 0 : node->download_offset_;
|
||||
send_closure(file_load_manager_, &FileLoadManager::download, id, node->remote_.full(), node->local_, node->size_,
|
||||
node->suggested_name(), node->encryption_key_, node->can_search_locally_, download_offset, priority);
|
||||
send_closure(file_load_manager_, &FileLoadManager::download, id, node->remote_.full.value(), node->local_,
|
||||
node->size_, node->suggested_name(), node->encryption_key_, node->can_search_locally_, download_offset,
|
||||
priority);
|
||||
}
|
||||
|
||||
class ForceUploadActor : public Actor {
|
||||
public:
|
||||
ForceUploadActor(FileId file_id, std::shared_ptr<FileManager::UploadCallback> callback, int32 new_priority,
|
||||
uint64 upload_order, ActorShared<> parent)
|
||||
: file_id_(file_id)
|
||||
, callback_(std::move(callback))
|
||||
, new_priority_(new_priority)
|
||||
, upload_order_(upload_order)
|
||||
, parent_(std::move(parent)) {
|
||||
}
|
||||
virtual ~ForceUploadActor() = default;
|
||||
|
||||
private:
|
||||
FileId file_id_;
|
||||
std::shared_ptr<FileManager::UploadCallback> callback_;
|
||||
int32 new_priority_;
|
||||
uint64 upload_order_;
|
||||
ActorShared<> parent_;
|
||||
bool is_active_{false};
|
||||
int attempt_{0};
|
||||
class UploadCallback : public FileManager::UploadCallback {
|
||||
public:
|
||||
virtual ~UploadCallback() {
|
||||
}
|
||||
UploadCallback(ActorId<ForceUploadActor> callback) : callback_(std::move(callback)) {
|
||||
}
|
||||
void on_upload_ok(FileId file_id, tl_object_ptr<telegram_api::InputFile> input_file) override {
|
||||
send_closure(callback_, &ForceUploadActor::on_upload_ok, std::move(input_file));
|
||||
}
|
||||
|
||||
void on_upload_encrypted_ok(FileId file_id, tl_object_ptr<telegram_api::InputEncryptedFile> input_file) override {
|
||||
send_closure(callback_, &ForceUploadActor::on_upload_encrypted_ok, std::move(input_file));
|
||||
}
|
||||
|
||||
void on_upload_secure_ok(FileId file_id, tl_object_ptr<telegram_api::InputSecureFile> input_file) override {
|
||||
send_closure(callback_, &ForceUploadActor::on_upload_secure_ok, std::move(input_file));
|
||||
}
|
||||
|
||||
void on_upload_error(FileId file_id, Status error) override {
|
||||
send_closure(callback_, &ForceUploadActor::on_upload_error, std::move(error));
|
||||
}
|
||||
|
||||
private:
|
||||
ActorId<ForceUploadActor> callback_;
|
||||
};
|
||||
|
||||
void on_upload_ok(tl_object_ptr<telegram_api::InputFile> input_file) {
|
||||
is_active_ = false;
|
||||
if (input_file || is_ready()) {
|
||||
callback_->on_upload_ok(file_id_, std::move(input_file));
|
||||
on_ok();
|
||||
} else {
|
||||
loop();
|
||||
}
|
||||
}
|
||||
|
||||
void on_upload_encrypted_ok(tl_object_ptr<telegram_api::InputEncryptedFile> input_file) {
|
||||
is_active_ = false;
|
||||
if (input_file || is_ready()) {
|
||||
callback_->on_upload_encrypted_ok(file_id_, std::move(input_file));
|
||||
on_ok();
|
||||
} else {
|
||||
loop();
|
||||
}
|
||||
}
|
||||
|
||||
void on_upload_secure_ok(tl_object_ptr<telegram_api::InputSecureFile> input_file) {
|
||||
is_active_ = false;
|
||||
if (input_file || is_ready()) {
|
||||
callback_->on_upload_secure_ok(file_id_, std::move(input_file));
|
||||
on_ok();
|
||||
} else {
|
||||
loop();
|
||||
}
|
||||
}
|
||||
|
||||
bool is_ready() {
|
||||
return G()->file_manager().get_actor_unsafe()->get_file_view(file_id_).has_active_upload_remote_location();
|
||||
}
|
||||
|
||||
void on_ok() {
|
||||
callback_.reset();
|
||||
stop();
|
||||
}
|
||||
|
||||
void on_upload_error(Status error) {
|
||||
if (attempt_ == 2) {
|
||||
callback_->on_upload_error(file_id_, std::move(error));
|
||||
callback_.reset();
|
||||
stop();
|
||||
}
|
||||
}
|
||||
|
||||
auto create_callback() {
|
||||
return std::make_shared<UploadCallback>(actor_id(this));
|
||||
}
|
||||
void loop() override {
|
||||
if (is_active_) {
|
||||
return;
|
||||
}
|
||||
|
||||
is_active_ = true;
|
||||
attempt_++;
|
||||
send_closure(G()->file_manager(), &FileManager::resume_upload, file_id_, std::vector<int>(), create_callback(),
|
||||
new_priority_, upload_order_, true);
|
||||
}
|
||||
|
||||
void tear_down() override {
|
||||
if (callback_) {
|
||||
callback_->on_upload_error(file_id_, Status::Error("Cancelled"));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void FileManager::resume_upload(FileId file_id, std::vector<int> bad_parts, std::shared_ptr<UploadCallback> callback,
|
||||
int32 new_priority, uint64 upload_order) {
|
||||
int32 new_priority, uint64 upload_order, bool force) {
|
||||
if (bad_parts.size() == 1 && bad_parts[0] == -1) {
|
||||
create_actor<ForceUploadActor>("ForceUploadActor", file_id, std::move(callback), new_priority, upload_order,
|
||||
context_->create_reference())
|
||||
.release();
|
||||
return;
|
||||
}
|
||||
LOG(INFO) << "Resume upload of file " << file_id << " with priority " << new_priority;
|
||||
|
||||
auto node = get_sync_file_node(file_id);
|
||||
@ -1892,6 +2059,9 @@ void FileManager::resume_upload(FileId file_id, std::vector<int> bad_parts, std:
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (force) {
|
||||
node->remote_.is_full_alive = false;
|
||||
}
|
||||
if (node->upload_pause_ == file_id) {
|
||||
node->set_upload_pause(FileId());
|
||||
}
|
||||
@ -1912,7 +2082,7 @@ void FileManager::resume_upload(FileId file_id, std::vector<int> bad_parts, std:
|
||||
}
|
||||
}
|
||||
|
||||
if (!file_view.has_local_location() && !file_view.has_generate_location() && !file_view.has_remote_location()) {
|
||||
if (!file_view.has_local_location() && !file_view.has_generate_location() && !file_view.has_alive_remote_location()) {
|
||||
LOG(INFO) << "File " << file_id << " can't be uploaded";
|
||||
if (callback) {
|
||||
callback->on_upload_error(file_id,
|
||||
@ -1943,12 +2113,12 @@ bool FileManager::delete_partial_remote_location(FileId file_id) {
|
||||
if (node->upload_pause_ == file_id) {
|
||||
node->set_upload_pause(FileId());
|
||||
}
|
||||
if (node->remote_.type() == RemoteFileLocation::Type::Full) {
|
||||
if (node->remote_.is_full_alive) {
|
||||
LOG(INFO) << "File " << file_id << " is already uploaded";
|
||||
return true;
|
||||
}
|
||||
|
||||
node->set_remote_location(RemoteFileLocation(), FileLocationSource::None, 0);
|
||||
node->delete_partial_remote_location();
|
||||
auto *file_info = get_file_id_info(file_id);
|
||||
file_info->upload_priority_ = 0;
|
||||
|
||||
@ -2156,7 +2326,7 @@ void FileManager::run_upload(FileNodePtr node, std::vector<int> bad_parts) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (node->remote_.type() != RemoteFileLocation::Type::Partial && node->get_by_hash_) {
|
||||
if (!node->remote_.partial && node->get_by_hash_) {
|
||||
LOG(INFO) << "Get file " << node->main_file_id_ << " by hash";
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::UploadByHash});
|
||||
node->upload_id_ = id;
|
||||
@ -2172,7 +2342,7 @@ void FileManager::run_upload(FileNodePtr node, std::vector<int> bad_parts) {
|
||||
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Upload});
|
||||
node->upload_id_ = id;
|
||||
send_closure(file_load_manager_, &FileLoadManager::upload, id, node->local_, node->remote_,
|
||||
send_closure(file_load_manager_, &FileLoadManager::upload, id, node->local_, node->remote_.partial_or_empty(),
|
||||
file_view.expected_size(true), node->encryption_key_, new_priority, std::move(bad_parts));
|
||||
|
||||
LOG(INFO) << "File " << file_id << " upload request has sent to FileLoadManager";
|
||||
@ -2304,7 +2474,7 @@ td_api::object_ptr<td_api::file> FileManager::get_file_object(FileId file_id, bo
|
||||
}
|
||||
|
||||
string persistent_file_id;
|
||||
if (file_view.has_remote_location()) {
|
||||
if (file_view.has_alive_remote_location()) {
|
||||
persistent_file_id = get_persistent_id(file_view.remote_location());
|
||||
} else if (file_view.has_url()) {
|
||||
persistent_file_id = file_view.url();
|
||||
@ -2690,7 +2860,7 @@ void FileManager::on_partial_upload(QueryId query_id, const PartialRemoteFileLoc
|
||||
return;
|
||||
}
|
||||
|
||||
file_node->set_remote_location(RemoteFileLocation(partial_remote), FileLocationSource::None, ready_size);
|
||||
file_node->set_partial_remote_location(partial_remote, ready_size);
|
||||
try_flush_node(file_node, "on_partial_upload");
|
||||
}
|
||||
|
||||
@ -2929,21 +3099,18 @@ void FileManager::on_error_impl(FileNodePtr node, FileManager::Query::Type type,
|
||||
node->drop_local_location();
|
||||
}
|
||||
}
|
||||
if (node->remote_.type() == RemoteFileLocation::Type::Partial) {
|
||||
node->set_remote_location(RemoteFileLocation(), FileLocationSource::None, 0);
|
||||
}
|
||||
node->delete_partial_remote_location();
|
||||
status = Status::Error(400, status.message());
|
||||
}
|
||||
}
|
||||
|
||||
if (status.message() == "FILE_PART_INVALID") {
|
||||
bool has_partial_small_location =
|
||||
node->remote_.type() == RemoteFileLocation::Type::Partial && !node->remote_.partial().is_big_;
|
||||
bool has_partial_small_location = node->remote_.partial && !node->remote_.partial->is_big_;
|
||||
FileView file_view(node);
|
||||
auto expected_size = file_view.expected_size(true);
|
||||
bool should_be_big_location = is_file_big(file_view.get_type(), expected_size);
|
||||
|
||||
node->set_remote_location(RemoteFileLocation(), FileLocationSource::None, 0);
|
||||
node->delete_partial_remote_location();
|
||||
if (has_partial_small_location && should_be_big_location) {
|
||||
run_upload(node, {});
|
||||
return;
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include "td/utils/Container.h"
|
||||
#include "td/utils/Enumerator.h"
|
||||
#include "td/utils/logging.h"
|
||||
#include "td/utils/optional.h"
|
||||
#include "td/utils/Slice.h"
|
||||
#include "td/utils/Status.h"
|
||||
|
||||
@ -47,9 +48,24 @@ class FileDbInterface;
|
||||
|
||||
enum class FileLocationSource : int8 { None, FromUser, FromDb, FromServer };
|
||||
|
||||
struct NewRemoteFileLocation {
|
||||
NewRemoteFileLocation() = default;
|
||||
NewRemoteFileLocation(RemoteFileLocation remote, FileLocationSource source);
|
||||
RemoteFileLocation partial_or_empty() const;
|
||||
unique_ptr<PartialRemoteFileLocation> partial;
|
||||
|
||||
//TODO: use RemoteId
|
||||
// hardest part is to determine wether we should flush this location to db.
|
||||
// probably, will need some generation in RemoteInfo
|
||||
optional<FullRemoteFileLocation> full;
|
||||
bool is_full_alive{false}; // if false, then we may try to upload this file
|
||||
FileLocationSource full_source{FileLocationSource::None};
|
||||
int64 ready_size = 0;
|
||||
};
|
||||
|
||||
class FileNode {
|
||||
public:
|
||||
FileNode(LocalFileLocation local, RemoteFileLocation remote, unique_ptr<FullGenerateFileLocation> generate,
|
||||
FileNode(LocalFileLocation local, NewRemoteFileLocation remote, unique_ptr<FullGenerateFileLocation> generate,
|
||||
int64 size, int64 expected_size, string remote_name, string url, DialogId owner_dialog_id,
|
||||
FileEncryptionKey key, FileId main_file_id, int8 main_file_id_priority)
|
||||
: local_(std::move(local))
|
||||
@ -68,7 +84,10 @@ class FileNode {
|
||||
void drop_local_location();
|
||||
void set_local_location(const LocalFileLocation &local, int64 ready_size, int64 prefix_offset,
|
||||
int64 ready_prefix_size);
|
||||
void set_remote_location(const RemoteFileLocation &remote, FileLocationSource source, int64 ready_size);
|
||||
void set_new_remote_location(NewRemoteFileLocation remote);
|
||||
void delete_partial_remote_location();
|
||||
void set_partial_remote_location(const PartialRemoteFileLocation &remote, int64 ready_size);
|
||||
|
||||
bool delete_file_reference(Slice file_reference);
|
||||
void set_generate_location(unique_ptr<FullGenerateFileLocation> &&generate);
|
||||
void set_size(int64 size);
|
||||
@ -107,9 +126,9 @@ class FileNode {
|
||||
int64 local_ready_size_ = 0; // PartialLocal only
|
||||
int64 local_ready_prefix_size_ = 0; // PartialLocal only
|
||||
|
||||
RemoteFileLocation remote_;
|
||||
NewRemoteFileLocation remote_;
|
||||
|
||||
FileLoadManager::QueryId download_id_ = 0;
|
||||
int64 remote_ready_size_ = 0;
|
||||
|
||||
unique_ptr<FullGenerateFileLocation> generate_;
|
||||
FileLoadManager::QueryId generate_id_ = 0;
|
||||
@ -135,8 +154,6 @@ class FileNode {
|
||||
|
||||
int8 main_file_id_priority_ = 0;
|
||||
|
||||
FileLocationSource remote_source_ = FileLocationSource::FromUser;
|
||||
|
||||
bool is_download_offset_dirty_ = false;
|
||||
|
||||
bool get_by_hash_ = false;
|
||||
@ -212,6 +229,7 @@ class FileView {
|
||||
bool has_local_location() const;
|
||||
const FullLocalFileLocation &local_location() const;
|
||||
bool has_remote_location() const;
|
||||
bool has_alive_remote_location() const;
|
||||
bool has_active_upload_remote_location() const;
|
||||
bool has_active_download_remote_location() const;
|
||||
const FullRemoteFileLocation &remote_location() const;
|
||||
@ -380,7 +398,7 @@ class FileManager : public FileLoadManager::Callback {
|
||||
void download(FileId file_id, std::shared_ptr<DownloadCallback> callback, int32 new_priority, int64 offset);
|
||||
void upload(FileId file_id, std::shared_ptr<UploadCallback> callback, int32 new_priority, uint64 upload_order);
|
||||
void resume_upload(FileId file_id, std::vector<int> bad_parts, std::shared_ptr<UploadCallback> callback,
|
||||
int32 new_priority, uint64 upload_order);
|
||||
int32 new_priority, uint64 upload_order, bool force = false);
|
||||
void cancel_upload(FileId file_id);
|
||||
bool delete_partial_remote_location(FileId file_id);
|
||||
void delete_file_reference(FileId file_id, std::string file_reference);
|
||||
|
@ -35,6 +35,7 @@ class optional {
|
||||
} else {
|
||||
impl_ = Result<T>();
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
optional(optional &&other) = default;
|
||||
|
Reference in New Issue
Block a user