FileReference: minor improvements
GitOrigin-RevId: 22c605f90445d7d6aefefea16c834051e29b1fed
This commit is contained in:
parent
a75726d77a
commit
bbecdcda20
@ -20,52 +20,41 @@ FileSourceId FileReferenceManager::create_file_source(FullMessageId full_message
|
||||
if (it != from_full_message_id_.end()) {
|
||||
return it->second;
|
||||
}
|
||||
++last_file_source_id_;
|
||||
to_full_message_id_[last_file_source_id_] = full_message_id;
|
||||
from_full_message_id_[full_message_id] = FileSourceId{last_file_source_id_};
|
||||
return FileSourceId{last_file_source_id_};
|
||||
auto source_id = FileSourceId{++last_file_source_id_};
|
||||
to_full_message_id_[source_id] = full_message_id;
|
||||
from_full_message_id_[full_message_id] = source_id;
|
||||
return source_id;
|
||||
}
|
||||
|
||||
void FileReferenceManager::update_file_reference(FileId file_id, std::vector<FileSourceId> sources, Promise<> promise) {
|
||||
LOG(ERROR) << "update file reference: " << file_id << " " << format::as_array(sources);
|
||||
//if (td::Random::fast(0, 3) == 0) {
|
||||
//promise.set_error(td::Status::Error("Error"));
|
||||
//return;
|
||||
//}
|
||||
|
||||
//if (td::Random::fast(0, 3) == 0) {
|
||||
//promise.set_value(Unit());
|
||||
//return;
|
||||
//}
|
||||
|
||||
LOG(INFO) << "Trying to load valid file_reference from server: " << file_id << " " << format::as_array(sources);
|
||||
MultiPromiseActorSafe mpas{"UpdateFileReferenceMultiPromiseActor"};
|
||||
mpas.add_promise(std::move(promise));
|
||||
auto lock = mpas.get_promise();
|
||||
SCOPE_EXIT {
|
||||
lock.set_value(Unit());
|
||||
};
|
||||
for (auto &source_id : sources) {
|
||||
for (auto source_id : sources) {
|
||||
auto it = to_full_message_id_.find(source_id);
|
||||
if (it != to_full_message_id_.end()) {
|
||||
std::vector<FullMessageId> message_ids = {it->second};
|
||||
auto new_promise = PromiseCreator::lambda([promise = mpas.get_promise(), file_id, source_id,
|
||||
file_manager = G()->file_manager()](Result<Unit> res) mutable {
|
||||
if (res.is_error()) {
|
||||
send_closure(file_manager, &FileManager::remove_file_source, file_id, source_id);
|
||||
send_lambda(file_manager, [promise = std::move(promise)]() mutable { promise.set_value({}); });
|
||||
} else {
|
||||
promise.set_value(Unit());
|
||||
}
|
||||
});
|
||||
|
||||
LOG(ERROR) << "Ask for " << it->second;
|
||||
send_closure_later(G()->messages_manager(), &MessagesManager::get_messages_from_server, std::move(message_ids),
|
||||
std::move(new_promise), nullptr);
|
||||
} else {
|
||||
LOG(ERROR) << "Invalid source id " << source_id << " " << file_id;
|
||||
send_closure(G()->file_manager(), &FileManager::remove_file_source, file_id, source_id);
|
||||
send_lambda(G()->file_manager(), [promise = mpas.get_promise()]() mutable { promise.set_value(Unit()); });
|
||||
auto new_promise = PromiseCreator::lambda([promise = mpas.get_promise(), file_id, source_id,
|
||||
file_manager = G()->file_manager()](Result<Unit> res) mutable {
|
||||
if (res.is_error()) {
|
||||
LOG(INFO) << "Invalid source id " << source_id << " " << res.error();
|
||||
send_closure(file_manager, &FileManager::remove_file_source, file_id, source_id);
|
||||
}
|
||||
// NB: main promise must send closure to FileManager
|
||||
// So the closure will be executed only after the bad source id is removed
|
||||
promise.set_value(Unit());
|
||||
});
|
||||
if (it == to_full_message_id_.end()) {
|
||||
new_promise.set_error(Status::Error("Unkonwn source id"));
|
||||
continue;
|
||||
}
|
||||
|
||||
std::vector<FullMessageId> message_ids = {it->second};
|
||||
LOG(INFO) << source_id << ": load message from server " << it->second;
|
||||
send_closure_later(G()->messages_manager(), &MessagesManager::get_messages_from_server, std::move(message_ids),
|
||||
std::move(new_promise), nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ class FileReferenceManager : public Actor {
|
||||
|
||||
private:
|
||||
td::int32 last_file_source_id_{0};
|
||||
std::map<FileSourceId, FullMessageId> to_full_message_id_;
|
||||
std::unordered_map<FileSourceId, FullMessageId, FileSourceIdHash> to_full_message_id_;
|
||||
std::unordered_map<FullMessageId, FileSourceId, FullMessageIdHash> from_full_message_id_;
|
||||
};
|
||||
|
||||
|
@ -272,7 +272,8 @@ Status FileDownloader::check_net_query(NetQueryPtr &net_query) {
|
||||
if (net_query->is_error()) {
|
||||
auto error = net_query->move_as_error();
|
||||
if (begins_with(error.message(), "FILE_REFERENCE_")) {
|
||||
error = Status::Error(400, PSLICE() << "FILE_REFERENCE_BASE64" << base64_encode(remote_.get_file_reference()));
|
||||
error = Status::Error(400, PSLICE()
|
||||
<< "FILE_REFERENCE_EXPIRED_BASE64" << base64_encode(remote_.get_file_reference()));
|
||||
}
|
||||
return error;
|
||||
}
|
||||
|
@ -14,7 +14,50 @@
|
||||
|
||||
namespace td {
|
||||
|
||||
using FileSourceId = int32;
|
||||
class FileSourceId {
|
||||
int32 id = 0;
|
||||
|
||||
public:
|
||||
FileSourceId() = default;
|
||||
|
||||
explicit FileSourceId(int32 file_id) : id(file_id) {
|
||||
}
|
||||
template <class T1, typename = std::enable_if_t<std::is_convertible<T1, int32>::value>>
|
||||
FileSourceId(T1 file_id) = delete;
|
||||
|
||||
bool empty() const {
|
||||
return id <= 0;
|
||||
}
|
||||
bool is_valid() const {
|
||||
return id > 0;
|
||||
}
|
||||
|
||||
int32 get() const {
|
||||
return id;
|
||||
}
|
||||
|
||||
bool operator<(const FileSourceId &other) const {
|
||||
return id < other.id;
|
||||
}
|
||||
|
||||
bool operator==(const FileSourceId &other) const {
|
||||
return id == other.id;
|
||||
}
|
||||
|
||||
bool operator!=(const FileSourceId &other) const {
|
||||
return id != other.id;
|
||||
}
|
||||
};
|
||||
|
||||
struct FileSourceIdHash {
|
||||
std::size_t operator()(FileSourceId file_id) const {
|
||||
return std::hash<int32>()(file_id.get());
|
||||
}
|
||||
};
|
||||
|
||||
inline StringBuilder &operator<<(StringBuilder &string_builder, FileSourceId file_id) {
|
||||
return string_builder << "FileSourceId(" << file_id.get() << ")";
|
||||
}
|
||||
|
||||
class FileId {
|
||||
int32 id = 0;
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include "td/telegram/files/FileEncryptionKey.h"
|
||||
#include "td/telegram/net/DcId.h"
|
||||
|
||||
#include "td/utils/base64.h"
|
||||
#include "td/utils/buffer.h"
|
||||
#include "td/utils/common.h"
|
||||
#include "td/utils/format.h"
|
||||
@ -549,6 +550,9 @@ class FullRemoteFileLocation {
|
||||
file_reference_ = {};
|
||||
return true;
|
||||
}
|
||||
bool has_file_reference() const {
|
||||
return !file_reference_.empty();
|
||||
}
|
||||
string get_file_reference() const {
|
||||
return file_reference_;
|
||||
}
|
||||
@ -715,8 +719,7 @@ inline StringBuilder &operator<<(StringBuilder &string_builder,
|
||||
string_builder << ", " << full_remote_file_location.get_dc_id();
|
||||
}
|
||||
if (!full_remote_file_location.file_reference_.empty()) {
|
||||
string_builder << ", "
|
||||
<< tag("file_reference", format::as_hex_dump<0>(Slice(full_remote_file_location.file_reference_)));
|
||||
string_builder << ", " << tag("file_reference", base64_encode(full_remote_file_location.file_reference_));
|
||||
}
|
||||
|
||||
string_builder << ", location = ";
|
||||
|
@ -362,7 +362,7 @@ bool FileView::has_active_remote_location() const {
|
||||
if (remote_location().is_encrypted_any()) {
|
||||
return true;
|
||||
}
|
||||
return !remote_location().get_file_reference().empty();
|
||||
return remote_location().has_file_reference();
|
||||
}
|
||||
const FullRemoteFileLocation &FileView::remote_location() const {
|
||||
CHECK(has_remote_location());
|
||||
@ -983,8 +983,8 @@ static int merge_choose_remote_location(const RemoteFileLocation &x, int8 x_sour
|
||||
if (x.full().is_web() != y.full().is_web()) {
|
||||
return x.full().is_web(); // prefer non-web
|
||||
}
|
||||
auto x_ref = !x.full().get_file_reference().empty();
|
||||
auto y_ref = !y.full().get_file_reference().empty();
|
||||
auto x_ref = x.full().has_file_reference();
|
||||
auto y_ref = !y.full().has_file_reference();
|
||||
if (x_ref || y_ref) {
|
||||
if (x_ref != y_ref) {
|
||||
return !x_ref;
|
||||
@ -1103,7 +1103,7 @@ void FileManager::cancel_generate(FileNodePtr node) {
|
||||
}
|
||||
|
||||
Result<FileId> FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sync) {
|
||||
LOG(ERROR) << x_file_id << " VS " << y_file_id;
|
||||
LOG(DEBUG) << x_file_id << " VS " << y_file_id;
|
||||
|
||||
if (!x_file_id.is_valid()) {
|
||||
return Status::Error("First file_id is invalid");
|
||||
@ -1197,8 +1197,6 @@ Result<FileId> FileManager::merge(FileId x_file_id, FileId y_file_id, bool no_sy
|
||||
<< ", generate_i = " << generate_i << ", size_i = " << size_i << ", remote_name_i = " << remote_name_i
|
||||
<< ", url_i = " << url_i << ", owner_i = " << owner_i << ", encryption_key_i = " << encryption_key_i
|
||||
<< ", main_file_id_i = " << main_file_id_i;
|
||||
LOG(ERROR) << FileView(node).has_active_remote_location();
|
||||
LOG(ERROR) << FileView(other_node).has_active_remote_location();
|
||||
if (local_i == other_node_i) {
|
||||
cancel_download(node);
|
||||
node->set_download_offset(other_node->download_offset_);
|
||||
@ -1747,25 +1745,26 @@ void FileManager::run_download(FileNodePtr node) {
|
||||
|
||||
// If file reference is needed
|
||||
if (!file_view.has_active_remote_location()) {
|
||||
LOG(ERROR) << "run_download: no active location";
|
||||
LOG(INFO) << "run_download: Do not have valid file_reference " << file_id;
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Download});
|
||||
node->download_id_ = id;
|
||||
if (node->file_source_ids_.empty()) {
|
||||
on_error(id, Status::Error("Can't download file: no valid file refernce and no valid source id"));
|
||||
on_error(id, Status::Error("Can't download file: have valid file reference and no valid source id"));
|
||||
return;
|
||||
}
|
||||
|
||||
send_closure(G()->file_reference_manager(), &FileReferenceManager::update_file_reference, file_id,
|
||||
node->file_source_ids_, PromiseCreator::lambda([id, actor_id = actor_id(this)](Result<Unit> res) {
|
||||
Status error;
|
||||
LOG(ERROR) << "run_download: update_file_reference finished";
|
||||
if (res.is_ok()) {
|
||||
error = td::Status::Error("FILE_DOWNLOAD_RESTART_WITH_FILE_REFERENCE");
|
||||
} else {
|
||||
error = res.move_as_error();
|
||||
}
|
||||
send_closure(actor_id, &FileManager::on_error, id, std::move(error));
|
||||
}));
|
||||
send_closure(
|
||||
G()->file_reference_manager(), &FileReferenceManager::update_file_reference, file_id, node->file_source_ids_,
|
||||
PromiseCreator::lambda([id, actor_id = actor_id(this), file_id](Result<Unit> res) {
|
||||
Status error;
|
||||
if (res.is_ok()) {
|
||||
error = td::Status::Error("FILE_DOWNLOAD_RESTART_WITH_FILE_REFERENCE");
|
||||
} else {
|
||||
error = res.move_as_error();
|
||||
}
|
||||
LOG(INFO) << "run_download: Got result from FileSourceManager for file " << file_id << " : " << error;
|
||||
send_closure(actor_id, &FileManager::on_error, id, std::move(error));
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1867,7 +1866,7 @@ bool FileManager::delete_partial_remote_location(FileId file_id) {
|
||||
}
|
||||
|
||||
void FileManager::delete_file_reference(FileId file_id, std::string file_reference) {
|
||||
LOG(ERROR) << "Delete file reference " << file_id << tag("reference", format::as_hex_dump<0>(Slice(file_reference)));
|
||||
LOG(INFO) << "Delete file reference to file " << file_id << " " << tag("reference", base64_encode(file_reference));
|
||||
auto node = get_sync_file_node(file_id);
|
||||
if (!node) {
|
||||
LOG(INFO) << "Wrong file id " << file_id;
|
||||
@ -2714,7 +2713,6 @@ void FileManager::on_error_impl(FileNodePtr node, FileManager::Query::Type type,
|
||||
SCOPE_EXIT {
|
||||
try_flush_node(node, "on_error");
|
||||
};
|
||||
//FIXME: should we apply error if !was_active?
|
||||
if (status.code() != 1 && !G()->close_flag()) {
|
||||
LOG(WARNING) << "Failed to upload/download/generate file: " << status << ". Query type = " << type
|
||||
<< ". File type is " << FileView(node).get_type();
|
||||
@ -2758,13 +2756,15 @@ void FileManager::on_error_impl(FileNodePtr node, FileManager::Query::Type type,
|
||||
}
|
||||
if (begins_with(status.message(), "FILE_REFERENCE_")) {
|
||||
string file_reference;
|
||||
Slice prefix = "FILE_REFERENCE_BASE64";
|
||||
Slice prefix = "FILE_REFERENCE_EXPIRED_BASE64";
|
||||
if (begins_with(status.message(), prefix)) {
|
||||
auto tmp = base64_decode(status.message().substr(prefix.size()));
|
||||
LOG_IF(WARNING, tmp.is_error()) << "Can't decode file reference from error " << status << " " << tmp.error();
|
||||
if (tmp.is_ok()) {
|
||||
file_reference = tmp.move_as_ok();
|
||||
}
|
||||
} else {
|
||||
LOG(ERROR) << "Unexpected error, file_reference will be deleted just in case " << status;
|
||||
}
|
||||
CHECK(!node->file_ids_.empty());
|
||||
delete_file_reference(node->file_ids_.back(), file_reference);
|
||||
@ -2779,12 +2779,12 @@ void FileManager::on_error_impl(FileNodePtr node, FileManager::Query::Type type,
|
||||
if (begins_with(status.message(), "FILE_DOWNLOAD_RESTART")) {
|
||||
if (ends_with(status.message(), "WITH_FILE_REFERENCE")) {
|
||||
if (FileView(node).has_active_remote_location()) {
|
||||
LOG(ERROR) << "??????";
|
||||
run_download(node);
|
||||
return;
|
||||
}
|
||||
LOG_IF(WARNING, !node->file_source_ids_.empty())
|
||||
<< "Got no active remote locations " << FileView(node).remote_location();
|
||||
<< "Got no active remote locations, but have file_source_ids, download will be cancelled "
|
||||
<< FileView(node).remote_location();
|
||||
} else {
|
||||
node->can_search_locally_ = false;
|
||||
run_download(node);
|
||||
|
Loading…
x
Reference in New Issue
Block a user