Improve "id" variable names in FileManager.
This commit is contained in:
parent
ad8f0c4146
commit
4408af6643
@ -60,8 +60,8 @@ class FileDb final : public FileDbInterface {
|
||||
public:
|
||||
class FileDbActor final : public Actor {
|
||||
public:
|
||||
FileDbActor(FileDbId current_pmc_id, std::shared_ptr<SqliteKeyValueSafe> file_kv_safe)
|
||||
: current_pmc_id_(current_pmc_id), file_kv_safe_(std::move(file_kv_safe)) {
|
||||
FileDbActor(FileDbId max_file_db_id, std::shared_ptr<SqliteKeyValueSafe> file_kv_safe)
|
||||
: max_file_db_id_(max_file_db_id), file_kv_safe_(std::move(file_kv_safe)) {
|
||||
}
|
||||
|
||||
void close(Promise<> promise) {
|
||||
@ -72,20 +72,21 @@ class FileDb final : public FileDbInterface {
|
||||
}
|
||||
|
||||
void load_file_data(const string &key, Promise<FileData> promise) {
|
||||
promise.set_result(load_file_data_impl(actor_id(this), file_pmc(), key, current_pmc_id_));
|
||||
promise.set_result(load_file_data_impl(actor_id(this), file_pmc(), key, max_file_db_id_));
|
||||
}
|
||||
|
||||
void clear_file_data(FileDbId id, const string &remote_key, const string &local_key, const string &generate_key) {
|
||||
void clear_file_data(FileDbId file_db_id, const string &remote_key, const string &local_key,
|
||||
const string &generate_key) {
|
||||
auto &pmc = file_pmc();
|
||||
pmc.begin_write_transaction().ensure();
|
||||
|
||||
if (id > current_pmc_id_) {
|
||||
pmc.set("file_id", to_string(id.get()));
|
||||
current_pmc_id_ = id;
|
||||
if (file_db_id > max_file_db_id_) {
|
||||
pmc.set("file_id", to_string(file_db_id.get()));
|
||||
max_file_db_id_ = file_db_id;
|
||||
}
|
||||
|
||||
pmc.erase(PSTRING() << "file" << id.get());
|
||||
// LOG(DEBUG) << "ERASE " << format::as_hex_dump<4>(Slice(PSLICE() << "file" << id.get()));
|
||||
pmc.erase(PSTRING() << "file" << file_db_id.get());
|
||||
// LOG(DEBUG) << "ERASE " << format::as_hex_dump<4>(Slice(PSLICE() << "file" << file_db_id.get()));
|
||||
|
||||
if (!remote_key.empty()) {
|
||||
pmc.erase(remote_key);
|
||||
@ -102,79 +103,79 @@ class FileDb final : public FileDbInterface {
|
||||
pmc.commit_transaction().ensure();
|
||||
}
|
||||
|
||||
void store_file_data(FileDbId id, const string &file_data, const string &remote_key, const string &local_key,
|
||||
const string &generate_key) {
|
||||
void store_file_data(FileDbId file_db_id, const string &file_data, const string &remote_key,
|
||||
const string &local_key, const string &generate_key) {
|
||||
auto &pmc = file_pmc();
|
||||
pmc.begin_write_transaction().ensure();
|
||||
|
||||
if (id > current_pmc_id_) {
|
||||
pmc.set("file_id", to_string(id.get()));
|
||||
current_pmc_id_ = id;
|
||||
if (file_db_id > max_file_db_id_) {
|
||||
pmc.set("file_id", to_string(file_db_id.get()));
|
||||
max_file_db_id_ = file_db_id;
|
||||
}
|
||||
|
||||
pmc.set(PSTRING() << "file" << id.get(), file_data);
|
||||
pmc.set(PSTRING() << "file" << file_db_id.get(), file_data);
|
||||
|
||||
if (!remote_key.empty()) {
|
||||
pmc.set(remote_key, to_string(id.get()));
|
||||
pmc.set(remote_key, to_string(file_db_id.get()));
|
||||
}
|
||||
if (!local_key.empty()) {
|
||||
pmc.set(local_key, to_string(id.get()));
|
||||
pmc.set(local_key, to_string(file_db_id.get()));
|
||||
}
|
||||
if (!generate_key.empty()) {
|
||||
pmc.set(generate_key, to_string(id.get()));
|
||||
pmc.set(generate_key, to_string(file_db_id.get()));
|
||||
}
|
||||
|
||||
pmc.commit_transaction().ensure();
|
||||
}
|
||||
|
||||
void store_file_data_ref(FileDbId id, FileDbId new_id) {
|
||||
void store_file_data_ref(FileDbId file_db_id, FileDbId new_file_db_id) {
|
||||
auto &pmc = file_pmc();
|
||||
pmc.begin_write_transaction().ensure();
|
||||
|
||||
if (id > current_pmc_id_) {
|
||||
pmc.set("file_id", to_string(id.get()));
|
||||
current_pmc_id_ = id;
|
||||
if (file_db_id > max_file_db_id_) {
|
||||
pmc.set("file_id", to_string(file_db_id.get()));
|
||||
max_file_db_id_ = file_db_id;
|
||||
}
|
||||
|
||||
do_store_file_data_ref(id, new_id);
|
||||
do_store_file_data_ref(file_db_id, new_file_db_id);
|
||||
|
||||
pmc.commit_transaction().ensure();
|
||||
}
|
||||
|
||||
void optimize_refs(std::vector<FileDbId> ids, FileDbId main_id) {
|
||||
LOG(INFO) << "Optimize " << ids.size() << " ids in file database to " << main_id.get();
|
||||
void optimize_refs(std::vector<FileDbId> file_db_ids, FileDbId main_file_db_id) {
|
||||
LOG(INFO) << "Optimize " << file_db_ids.size() << " file_db_ids in file database to " << main_file_db_id.get();
|
||||
auto &pmc = file_pmc();
|
||||
pmc.begin_write_transaction().ensure();
|
||||
for (size_t i = 0; i + 1 < ids.size(); i++) {
|
||||
do_store_file_data_ref(ids[i], main_id);
|
||||
for (size_t i = 0; i + 1 < file_db_ids.size(); i++) {
|
||||
do_store_file_data_ref(file_db_ids[i], main_file_db_id);
|
||||
}
|
||||
pmc.commit_transaction().ensure();
|
||||
}
|
||||
|
||||
private:
|
||||
FileDbId current_pmc_id_;
|
||||
FileDbId max_file_db_id_;
|
||||
std::shared_ptr<SqliteKeyValueSafe> file_kv_safe_;
|
||||
|
||||
SqliteKeyValue &file_pmc() {
|
||||
return file_kv_safe_->get();
|
||||
}
|
||||
|
||||
void do_store_file_data_ref(FileDbId id, FileDbId new_id) {
|
||||
file_pmc().set(PSTRING() << "file" << id.get(), PSTRING() << "@@" << new_id.get());
|
||||
void do_store_file_data_ref(FileDbId file_db_id, FileDbId new_file_db_id) {
|
||||
file_pmc().set(PSTRING() << "file" << file_db_id.get(), PSTRING() << "@@" << new_file_db_id.get());
|
||||
}
|
||||
};
|
||||
|
||||
explicit FileDb(std::shared_ptr<SqliteKeyValueSafe> kv_safe, int scheduler_id = -1) {
|
||||
file_kv_safe_ = std::move(kv_safe);
|
||||
CHECK(file_kv_safe_);
|
||||
current_pmc_id_ = FileDbId(to_integer<uint64>(file_kv_safe_->get().get("file_id")));
|
||||
max_file_db_id_ = FileDbId(to_integer<uint64>(file_kv_safe_->get().get("file_id")));
|
||||
file_db_actor_ =
|
||||
create_actor_on_scheduler<FileDbActor>("FileDbActor", scheduler_id, current_pmc_id_, file_kv_safe_);
|
||||
create_actor_on_scheduler<FileDbActor>("FileDbActor", scheduler_id, max_file_db_id_, file_kv_safe_);
|
||||
}
|
||||
|
||||
FileDbId create_pmc_id() final {
|
||||
current_pmc_id_ = FileDbId(current_pmc_id_.get() + 1);
|
||||
return current_pmc_id_;
|
||||
FileDbId get_next_file_db_id() final {
|
||||
max_file_db_id_ = FileDbId(max_file_db_id_.get() + 1);
|
||||
return max_file_db_id_;
|
||||
}
|
||||
|
||||
void close(Promise<> promise) final {
|
||||
@ -186,10 +187,10 @@ class FileDb final : public FileDbInterface {
|
||||
}
|
||||
|
||||
Result<FileData> get_file_data_sync_impl(string key) final {
|
||||
return load_file_data_impl(file_db_actor_.get(), file_kv_safe_->get(), key, current_pmc_id_);
|
||||
return load_file_data_impl(file_db_actor_.get(), file_kv_safe_->get(), key, max_file_db_id_);
|
||||
}
|
||||
|
||||
void clear_file_data(FileDbId id, const FileData &file_data) final {
|
||||
void clear_file_data(FileDbId file_db_id, const FileData &file_data) final {
|
||||
string remote_key;
|
||||
if (file_data.remote_.type() == RemoteFileLocation::Type::Full) {
|
||||
remote_key = as_key(file_data.remote_.full());
|
||||
@ -202,10 +203,11 @@ class FileDb final : public FileDbInterface {
|
||||
if (file_data.generate_ != nullptr) {
|
||||
generate_key = as_key(*file_data.generate_);
|
||||
}
|
||||
send_closure(file_db_actor_, &FileDbActor::clear_file_data, id, remote_key, local_key, generate_key);
|
||||
send_closure(file_db_actor_, &FileDbActor::clear_file_data, file_db_id, remote_key, local_key, generate_key);
|
||||
}
|
||||
|
||||
void set_file_data(FileDbId id, const FileData &file_data, bool new_remote, bool new_local, bool new_generate) final {
|
||||
void set_file_data(FileDbId file_db_id, const FileData &file_data, bool new_remote, bool new_local,
|
||||
bool new_generate) final {
|
||||
string remote_key;
|
||||
if (file_data.remote_.type() == RemoteFileLocation::Type::Full && new_remote) {
|
||||
remote_key = as_key(file_data.remote_.full());
|
||||
@ -218,16 +220,16 @@ class FileDb final : public FileDbInterface {
|
||||
if (file_data.generate_ != nullptr && new_generate) {
|
||||
generate_key = as_key(*file_data.generate_);
|
||||
}
|
||||
// LOG(DEBUG) << "SAVE " << id.get() << " -> " << file_data << " "
|
||||
// LOG(DEBUG) << "SAVE " << file_db_id.get() << " -> " << file_data << " "
|
||||
// << tag("remote_key", format::as_hex_dump<4>(Slice(remote_key)))
|
||||
// << tag("local_key", format::as_hex_dump<4>(Slice(local_key)))
|
||||
// << tag("generate_key", format::as_hex_dump<4>(Slice(generate_key)));
|
||||
send_closure(file_db_actor_, &FileDbActor::store_file_data, id, serialize(file_data), remote_key, local_key,
|
||||
send_closure(file_db_actor_, &FileDbActor::store_file_data, file_db_id, serialize(file_data), remote_key, local_key,
|
||||
generate_key);
|
||||
}
|
||||
|
||||
void set_file_data_ref(FileDbId id, FileDbId new_id) final {
|
||||
send_closure(file_db_actor_, &FileDbActor::store_file_data_ref, id, new_id);
|
||||
void set_file_data_ref(FileDbId file_db_id, FileDbId new_file_db_id) final {
|
||||
send_closure(file_db_actor_, &FileDbActor::store_file_data_ref, file_db_id, new_file_db_id);
|
||||
}
|
||||
SqliteKeyValue &pmc() final {
|
||||
return file_kv_safe_->get();
|
||||
@ -235,39 +237,39 @@ class FileDb final : public FileDbInterface {
|
||||
|
||||
private:
|
||||
ActorOwn<FileDbActor> file_db_actor_;
|
||||
FileDbId current_pmc_id_;
|
||||
FileDbId max_file_db_id_;
|
||||
std::shared_ptr<SqliteKeyValueSafe> file_kv_safe_;
|
||||
|
||||
static Result<FileData> load_file_data_impl(ActorId<FileDbActor> file_db_actor_id, SqliteKeyValue &pmc,
|
||||
const string &key, FileDbId current_pmc_id) {
|
||||
const string &key, FileDbId max_file_db_id) {
|
||||
// LOG(DEBUG) << "Load by key " << format::as_hex_dump<4>(Slice(key));
|
||||
TRY_RESULT(id, get_id(pmc, key));
|
||||
TRY_RESULT(file_db_id, get_file_db_id(pmc, key));
|
||||
|
||||
vector<FileDbId> ids;
|
||||
vector<FileDbId> file_db_ids;
|
||||
string data_str;
|
||||
int attempt_count = 0;
|
||||
while (true) {
|
||||
if (attempt_count > 100) {
|
||||
LOG(FATAL) << "Cycle in file database? current_pmc_id=" << current_pmc_id << " key=" << key
|
||||
<< " links=" << format::as_array(ids);
|
||||
LOG(FATAL) << "Cycle in file database? max_file_db_id=" << max_file_db_id << " key=" << key
|
||||
<< " links=" << format::as_array(file_db_ids);
|
||||
}
|
||||
attempt_count++;
|
||||
|
||||
data_str = pmc.get(PSTRING() << "file" << id.get());
|
||||
data_str = pmc.get(PSTRING() << "file" << file_db_id.get());
|
||||
auto data_slice = Slice(data_str);
|
||||
|
||||
if (data_slice.substr(0, 2) == "@@") {
|
||||
ids.push_back(id);
|
||||
file_db_ids.push_back(file_db_id);
|
||||
|
||||
id = FileDbId(to_integer<uint64>(data_slice.substr(2)));
|
||||
file_db_id = FileDbId(to_integer<uint64>(data_slice.substr(2)));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (ids.size() > 1) {
|
||||
send_closure(file_db_actor_id, &FileDbActor::optimize_refs, std::move(ids), id);
|
||||
if (file_db_ids.size() > 1) {
|
||||
send_closure(file_db_actor_id, &FileDbActor::optimize_refs, std::move(file_db_ids), file_db_id);
|
||||
}
|
||||
// LOG(DEBUG) << "By ID " << id.get() << " found data " << format::as_hex_dump<4>(Slice(data_str));
|
||||
// LOG(DEBUG) << "By ID " << file_db_id.get() << " found data " << format::as_hex_dump<4>(Slice(data_str));
|
||||
// LOG(INFO) << attempt_count;
|
||||
|
||||
log_event::WithVersion<TlParser> parser(data_str);
|
||||
@ -282,13 +284,13 @@ class FileDb final : public FileDbInterface {
|
||||
return std::move(data);
|
||||
}
|
||||
|
||||
static Result<FileDbId> get_id(SqliteKeyValue &pmc, const string &key) TD_WARN_UNUSED_RESULT {
|
||||
auto id_str = pmc.get(key);
|
||||
// LOG(DEBUG) << "Found ID " << id_str << " by key " << format::as_hex_dump<4>(Slice(key));
|
||||
if (id_str.empty()) {
|
||||
return Status::Error("There is no such a key in database");
|
||||
static Result<FileDbId> get_file_db_id(SqliteKeyValue &pmc, const string &key) TD_WARN_UNUSED_RESULT {
|
||||
auto file_db_id_str = pmc.get(key);
|
||||
// LOG(DEBUG) << "Found ID " << file_db_id_str << " by key " << format::as_hex_dump<4>(Slice(key));
|
||||
if (file_db_id_str.empty()) {
|
||||
return Status::Error("There is no such key in the database");
|
||||
}
|
||||
return FileDbId(to_integer<uint64>(id_str));
|
||||
return FileDbId(to_integer<uint64>(file_db_id_str));
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -39,7 +39,7 @@ class FileDbInterface {
|
||||
virtual ~FileDbInterface() = default;
|
||||
|
||||
// non-thread-safe
|
||||
virtual FileDbId create_pmc_id() = 0;
|
||||
virtual FileDbId get_next_file_db_id() = 0;
|
||||
|
||||
// thread-safe
|
||||
virtual void close(Promise<> promise) = 0;
|
||||
@ -75,10 +75,10 @@ class FileDbInterface {
|
||||
return res;
|
||||
}
|
||||
|
||||
virtual void clear_file_data(FileDbId id, const FileData &file_data) = 0;
|
||||
virtual void set_file_data(FileDbId id, const FileData &file_data, bool new_remote, bool new_local,
|
||||
virtual void clear_file_data(FileDbId file_db_id, const FileData &file_data) = 0;
|
||||
virtual void set_file_data(FileDbId file_db_id, const FileData &file_data, bool new_remote, bool new_local,
|
||||
bool new_generate) = 0;
|
||||
virtual void set_file_data_ref(FileDbId id, FileDbId new_id) = 0;
|
||||
virtual void set_file_data_ref(FileDbId file_db_id, FileDbId new_file_db_id) = 0;
|
||||
|
||||
// For FileStatsWorker. TODO: remove it
|
||||
virtual SqliteKeyValue &pmc() = 0;
|
||||
|
@ -51,8 +51,8 @@ class FileDbId {
|
||||
}
|
||||
};
|
||||
|
||||
inline StringBuilder &operator<<(StringBuilder &sb, const FileDbId &id) {
|
||||
return sb << "FileDbId{" << id.get() << "}";
|
||||
inline StringBuilder &operator<<(StringBuilder &sb, const FileDbId &file_db_id) {
|
||||
return sb << "FileDbId{" << file_db_id.get() << "}";
|
||||
}
|
||||
|
||||
} // namespace td
|
||||
|
@ -245,16 +245,16 @@ Result<std::pair<NetQueryPtr, bool>> FileDownloader::start_part(Part part, int32
|
||||
}
|
||||
#endif
|
||||
DcId dc_id = remote_.is_web() ? G()->get_webfile_dc_id() : remote_.get_dc_id();
|
||||
auto id = UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Default));
|
||||
auto unique_id = UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::Default));
|
||||
net_query =
|
||||
remote_.is_web()
|
||||
? G()->net_query_creator().create(
|
||||
id,
|
||||
unique_id,
|
||||
telegram_api::upload_getWebFile(remote_.as_input_web_file_location(), narrow_cast<int32>(part.offset),
|
||||
narrow_cast<int32>(size)),
|
||||
{}, dc_id, net_query_type, NetQuery::AuthFlag::On)
|
||||
: G()->net_query_creator().create(
|
||||
id,
|
||||
unique_id,
|
||||
telegram_api::upload_getFile(flags, false /*ignored*/, false /*ignored*/,
|
||||
remote_.as_input_file_location(), part.offset, narrow_cast<int32>(size)),
|
||||
{}, dc_id, net_query_type, NetQuery::AuthFlag::On);
|
||||
|
@ -43,7 +43,7 @@ ActorOwn<ResourceManager> &FileLoadManager::get_download_resource_manager(bool i
|
||||
return actor;
|
||||
}
|
||||
|
||||
void FileLoadManager::download(QueryId id, const FullRemoteFileLocation &remote_location,
|
||||
void FileLoadManager::download(QueryId query_id, const FullRemoteFileLocation &remote_location,
|
||||
const LocalFileLocation &local, int64 size, string name,
|
||||
const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int64 limit,
|
||||
int8 priority) {
|
||||
@ -53,7 +53,7 @@ void FileLoadManager::download(QueryId id, const FullRemoteFileLocation &remote_
|
||||
NodeId node_id = nodes_container_.create(Node());
|
||||
Node *node = nodes_container_.get(node_id);
|
||||
CHECK(node);
|
||||
node->query_id_ = id;
|
||||
node->query_id_ = query_id;
|
||||
auto callback = make_unique<FileDownloaderCallback>(actor_shared(this, node_id));
|
||||
bool is_small = size < 20 * 1024;
|
||||
node->loader_ =
|
||||
@ -63,11 +63,11 @@ void FileLoadManager::download(QueryId id, const FullRemoteFileLocation &remote_
|
||||
auto &resource_manager = get_download_resource_manager(is_small, dc_id);
|
||||
send_closure(resource_manager, &ResourceManager::register_worker,
|
||||
ActorShared<FileLoaderActor>(node->loader_.get(), static_cast<uint64>(-1)), priority);
|
||||
bool is_inserted = query_id_to_node_id_.emplace(id, node_id).second;
|
||||
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
|
||||
CHECK(is_inserted);
|
||||
}
|
||||
|
||||
void FileLoadManager::upload(QueryId id, const LocalFileLocation &local_location,
|
||||
void FileLoadManager::upload(QueryId query_id, const LocalFileLocation &local_location,
|
||||
const RemoteFileLocation &remote_location, int64 expected_size,
|
||||
const FileEncryptionKey &encryption_key, int8 priority, vector<int> bad_parts) {
|
||||
if (stop_flag_) {
|
||||
@ -76,17 +76,17 @@ void FileLoadManager::upload(QueryId id, const LocalFileLocation &local_location
|
||||
NodeId node_id = nodes_container_.create(Node());
|
||||
Node *node = nodes_container_.get(node_id);
|
||||
CHECK(node);
|
||||
node->query_id_ = id;
|
||||
node->query_id_ = query_id;
|
||||
auto callback = make_unique<FileUploaderCallback>(actor_shared(this, node_id));
|
||||
node->loader_ = create_actor<FileUploader>("Uploader", local_location, remote_location, expected_size, encryption_key,
|
||||
std::move(bad_parts), std::move(callback));
|
||||
send_closure(upload_resource_manager_, &ResourceManager::register_worker,
|
||||
ActorShared<FileLoaderActor>(node->loader_.get(), static_cast<uint64>(-1)), priority);
|
||||
bool is_inserted = query_id_to_node_id_.emplace(id, node_id).second;
|
||||
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
|
||||
CHECK(is_inserted);
|
||||
}
|
||||
|
||||
void FileLoadManager::upload_by_hash(QueryId id, const FullLocalFileLocation &local_location, int64 size,
|
||||
void FileLoadManager::upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size,
|
||||
int8 priority) {
|
||||
if (stop_flag_) {
|
||||
return;
|
||||
@ -94,20 +94,20 @@ void FileLoadManager::upload_by_hash(QueryId id, const FullLocalFileLocation &lo
|
||||
NodeId node_id = nodes_container_.create(Node());
|
||||
Node *node = nodes_container_.get(node_id);
|
||||
CHECK(node);
|
||||
node->query_id_ = id;
|
||||
node->query_id_ = query_id;
|
||||
auto callback = make_unique<FileHashUploaderCallback>(actor_shared(this, node_id));
|
||||
node->loader_ = create_actor<FileHashUploader>("HashUploader", local_location, size, std::move(callback));
|
||||
send_closure(upload_resource_manager_, &ResourceManager::register_worker,
|
||||
ActorShared<FileLoaderActor>(node->loader_.get(), static_cast<uint64>(-1)), priority);
|
||||
bool is_inserted = query_id_to_node_id_.emplace(id, node_id).second;
|
||||
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
|
||||
CHECK(is_inserted);
|
||||
}
|
||||
|
||||
void FileLoadManager::update_priority(QueryId id, int8 priority) {
|
||||
void FileLoadManager::update_priority(QueryId query_id, int8 priority) {
|
||||
if (stop_flag_) {
|
||||
return;
|
||||
}
|
||||
auto it = query_id_to_node_id_.find(id);
|
||||
auto it = query_id_to_node_id_.find(query_id);
|
||||
if (it == query_id_to_node_id_.end()) {
|
||||
return;
|
||||
}
|
||||
@ -118,18 +118,18 @@ void FileLoadManager::update_priority(QueryId id, int8 priority) {
|
||||
send_closure(node->loader_, &FileLoaderActor::update_priority, priority);
|
||||
}
|
||||
|
||||
void FileLoadManager::from_bytes(QueryId id, FileType type, BufferSlice bytes, string name) {
|
||||
void FileLoadManager::from_bytes(QueryId query_id, FileType type, BufferSlice bytes, string name) {
|
||||
if (stop_flag_) {
|
||||
return;
|
||||
}
|
||||
NodeId node_id = nodes_container_.create(Node());
|
||||
Node *node = nodes_container_.get(node_id);
|
||||
CHECK(node);
|
||||
node->query_id_ = id;
|
||||
node->query_id_ = query_id;
|
||||
auto callback = make_unique<FileFromBytesCallback>(actor_shared(this, node_id));
|
||||
node->loader_ =
|
||||
create_actor<FileFromBytes>("FromBytes", type, std::move(bytes), std::move(name), std::move(callback));
|
||||
bool is_inserted = query_id_to_node_id_.emplace(id, node_id).second;
|
||||
bool is_inserted = query_id_to_node_id_.emplace(query_id, node_id).second;
|
||||
CHECK(is_inserted);
|
||||
}
|
||||
|
||||
@ -160,23 +160,21 @@ void FileLoadManager::check_partial_local_location(PartialLocalFileLocation part
|
||||
}
|
||||
}
|
||||
|
||||
// void upload_reload_parts(QueryId id, vector<int32> parts);
|
||||
// void upload_restart(QueryId id);
|
||||
void FileLoadManager::cancel(QueryId id) {
|
||||
void FileLoadManager::cancel(QueryId query_id) {
|
||||
if (stop_flag_) {
|
||||
return;
|
||||
}
|
||||
auto it = query_id_to_node_id_.find(id);
|
||||
auto it = query_id_to_node_id_.find(query_id);
|
||||
if (it == query_id_to_node_id_.end()) {
|
||||
return;
|
||||
}
|
||||
on_error_impl(it->second, Status::Error(-1, "Canceled"));
|
||||
}
|
||||
void FileLoadManager::update_local_file_location(QueryId id, const LocalFileLocation &local) {
|
||||
void FileLoadManager::update_local_file_location(QueryId query_id, const LocalFileLocation &local) {
|
||||
if (stop_flag_) {
|
||||
return;
|
||||
}
|
||||
auto it = query_id_to_node_id_.find(id);
|
||||
auto it = query_id_to_node_id_.find(query_id);
|
||||
if (it == query_id_to_node_id_.end()) {
|
||||
return;
|
||||
}
|
||||
@ -187,11 +185,11 @@ void FileLoadManager::update_local_file_location(QueryId id, const LocalFileLoca
|
||||
send_closure(node->loader_, &FileLoaderActor::update_local_file_location, local);
|
||||
}
|
||||
|
||||
void FileLoadManager::update_downloaded_part(QueryId id, int64 offset, int64 limit) {
|
||||
void FileLoadManager::update_downloaded_part(QueryId query_id, int64 offset, int64 limit) {
|
||||
if (stop_flag_) {
|
||||
return;
|
||||
}
|
||||
auto it = query_id_to_node_id_.find(id);
|
||||
auto it = query_id_to_node_id_.find(query_id);
|
||||
if (it == query_id_to_node_id_.end()) {
|
||||
return;
|
||||
}
|
||||
@ -203,7 +201,7 @@ void FileLoadManager::update_downloaded_part(QueryId id, int64 offset, int64 lim
|
||||
}
|
||||
|
||||
void FileLoadManager::hangup() {
|
||||
nodes_container_.for_each([](auto id, auto &node) { node.loader_.reset(); });
|
||||
nodes_container_.for_each([](auto query_id, auto &node) { node.loader_.reset(); });
|
||||
stop_flag_ = true;
|
||||
loop();
|
||||
}
|
||||
|
@ -34,30 +34,30 @@ class FileLoadManager final : public Actor {
|
||||
using QueryId = uint64;
|
||||
class Callback : public Actor {
|
||||
public:
|
||||
virtual void on_start_download(QueryId id) = 0;
|
||||
virtual void on_partial_download(QueryId id, PartialLocalFileLocation partial_local, int64 ready_size,
|
||||
virtual void on_start_download(QueryId query_id) = 0;
|
||||
virtual void on_partial_download(QueryId query_id, PartialLocalFileLocation partial_local, int64 ready_size,
|
||||
int64 size) = 0;
|
||||
virtual void on_partial_upload(QueryId id, PartialRemoteFileLocation partial_remote, int64 ready_size) = 0;
|
||||
virtual void on_hash(QueryId id, string hash) = 0;
|
||||
virtual void on_upload_ok(QueryId id, FileType file_type, PartialRemoteFileLocation remtoe, int64 size) = 0;
|
||||
virtual void on_upload_full_ok(QueryId id, FullRemoteFileLocation remote) = 0;
|
||||
virtual void on_download_ok(QueryId id, FullLocalFileLocation local, int64 size, bool is_new) = 0;
|
||||
virtual void on_error(QueryId id, Status status) = 0;
|
||||
virtual void on_partial_upload(QueryId query_id, PartialRemoteFileLocation partial_remote, int64 ready_size) = 0;
|
||||
virtual void on_hash(QueryId query_id, string hash) = 0;
|
||||
virtual void on_upload_ok(QueryId query_id, FileType file_type, PartialRemoteFileLocation remtoe, int64 size) = 0;
|
||||
virtual void on_upload_full_ok(QueryId query_id, FullRemoteFileLocation remote) = 0;
|
||||
virtual void on_download_ok(QueryId query_id, FullLocalFileLocation local, int64 size, bool is_new) = 0;
|
||||
virtual void on_error(QueryId query_id, Status status) = 0;
|
||||
};
|
||||
|
||||
explicit FileLoadManager(ActorShared<Callback> callback, ActorShared<> parent);
|
||||
|
||||
void download(QueryId id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local, int64 size,
|
||||
string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset, int64 limit,
|
||||
int8 priority);
|
||||
void upload(QueryId id, const LocalFileLocation &local_location, const RemoteFileLocation &remote_location,
|
||||
void download(QueryId query_id, const FullRemoteFileLocation &remote_location, const LocalFileLocation &local,
|
||||
int64 size, string name, const FileEncryptionKey &encryption_key, bool search_file, int64 offset,
|
||||
int64 limit, int8 priority);
|
||||
void upload(QueryId query_id, const LocalFileLocation &local_location, const RemoteFileLocation &remote_location,
|
||||
int64 expected_size, const FileEncryptionKey &encryption_key, int8 priority, vector<int> bad_parts);
|
||||
void upload_by_hash(QueryId id, const FullLocalFileLocation &local_location, int64 size, int8 priority);
|
||||
void update_priority(QueryId id, int8 priority);
|
||||
void from_bytes(QueryId id, FileType type, BufferSlice bytes, string name);
|
||||
void cancel(QueryId id);
|
||||
void update_local_file_location(QueryId id, const LocalFileLocation &local);
|
||||
void update_downloaded_part(QueryId id, int64 offset, int64 limit);
|
||||
void upload_by_hash(QueryId query_id, const FullLocalFileLocation &local_location, int64 size, int8 priority);
|
||||
void update_priority(QueryId query_id, int8 priority);
|
||||
void from_bytes(QueryId query_id, FileType type, BufferSlice bytes, string name);
|
||||
void cancel(QueryId query_id);
|
||||
void update_local_file_location(QueryId query_id, const LocalFileLocation &local);
|
||||
void update_downloaded_part(QueryId query_id, int64 offset, int64 limit);
|
||||
|
||||
void get_content(string file_path, Promise<BufferSlice> promise);
|
||||
|
||||
|
@ -211,15 +211,15 @@ Status FileLoader::do_loop() {
|
||||
NetQueryPtr query;
|
||||
bool is_blocking;
|
||||
std::tie(query, is_blocking) = std::move(query_flag);
|
||||
uint64 id = UniqueId::next();
|
||||
uint64 unique_id = UniqueId::next();
|
||||
if (is_blocking) {
|
||||
CHECK(blocking_id_ == 0);
|
||||
blocking_id_ = id;
|
||||
blocking_id_ = unique_id;
|
||||
}
|
||||
part_map_[id] = std::make_pair(part, query->cancel_slot_.get_signal_new());
|
||||
// part_map_[id] = std::make_pair(part, query.get_weak());
|
||||
part_map_[unique_id] = std::make_pair(part, query->cancel_slot_.get_signal_new());
|
||||
// part_map_[unique_id] = std::make_pair(part, query.get_weak());
|
||||
|
||||
auto callback = actor_shared(this, id);
|
||||
auto callback = actor_shared(this, unique_id);
|
||||
if (delay_dispatcher_.empty()) {
|
||||
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback));
|
||||
} else {
|
||||
@ -259,15 +259,15 @@ void FileLoader::on_result(NetQueryPtr query) {
|
||||
if (stop_flag_) {
|
||||
return;
|
||||
}
|
||||
auto id = get_link_token();
|
||||
if (id == blocking_id_) {
|
||||
auto unique_id = get_link_token();
|
||||
if (unique_id == blocking_id_) {
|
||||
blocking_id_ = 0;
|
||||
}
|
||||
if (UniqueId::extract_key(id) == COMMON_QUERY_KEY) {
|
||||
if (UniqueId::extract_key(unique_id) == COMMON_QUERY_KEY) {
|
||||
on_common_query(std::move(query));
|
||||
return loop();
|
||||
}
|
||||
auto it = part_map_.find(id);
|
||||
auto it = part_map_.find(unique_id);
|
||||
if (it == part_map_.end()) {
|
||||
LOG(WARNING) << "Got result for unknown part";
|
||||
return;
|
||||
|
@ -1957,7 +1957,7 @@ void FileManager::flush_to_pmc(FileNodePtr node, bool new_remote, bool new_local
|
||||
bool create_flag = false;
|
||||
if (node->pmc_id_.empty()) {
|
||||
create_flag = true;
|
||||
node->pmc_id_ = file_db_->create_pmc_id();
|
||||
node->pmc_id_ = file_db_->get_next_file_db_id();
|
||||
}
|
||||
|
||||
FileData data;
|
||||
@ -2113,10 +2113,10 @@ bool FileManager::set_content(FileId file_id, BufferSlice bytes) {
|
||||
|
||||
node->set_download_priority(FROM_BYTES_PRIORITY);
|
||||
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Type::SetContent});
|
||||
node->download_id_ = id;
|
||||
QueryId query_id = queries_container_.create(Query{file_id, Query::Type::SetContent});
|
||||
node->download_id_ = query_id;
|
||||
node->is_download_started_ = true;
|
||||
send_closure(file_load_manager_, &FileLoadManager::from_bytes, id, node->remote_.full.value().file_type_,
|
||||
send_closure(file_load_manager_, &FileLoadManager::from_bytes, query_id, node->remote_.full.value().file_type_,
|
||||
std::move(bytes), node->suggested_path());
|
||||
return true;
|
||||
}
|
||||
@ -2422,10 +2422,10 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
|
||||
|
||||
if (node->need_reload_photo_ && file_view.may_reload_photo()) {
|
||||
LOG(INFO) << "Reload photo from file " << node->main_file_id_;
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Type::DownloadReloadDialog});
|
||||
node->download_id_ = id;
|
||||
QueryId query_id = queries_container_.create(Query{file_id, Query::Type::DownloadReloadDialog});
|
||||
node->download_id_ = query_id;
|
||||
context_->reload_photo(file_view.remote_location().get_source(),
|
||||
PromiseCreator::lambda([id, actor_id = actor_id(this), file_id](Result<Unit> res) {
|
||||
PromiseCreator::lambda([query_id, actor_id = actor_id(this), file_id](Result<Unit> res) {
|
||||
Status error;
|
||||
if (res.is_ok()) {
|
||||
error = Status::Error("FILE_DOWNLOAD_ID_INVALID");
|
||||
@ -2434,7 +2434,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
|
||||
}
|
||||
VLOG(file_references)
|
||||
<< "Got result from reload photo for file " << file_id << ": " << error;
|
||||
send_closure(actor_id, &FileManager::on_error, id, std::move(error));
|
||||
send_closure(actor_id, &FileManager::on_error, query_id, std::move(error));
|
||||
}));
|
||||
node->need_reload_photo_ = false;
|
||||
return;
|
||||
@ -2443,16 +2443,16 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
|
||||
// If file reference is needed
|
||||
if (!file_view.has_active_download_remote_location()) {
|
||||
VLOG(file_references) << "Do not have valid file_reference for file " << file_id;
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Type::DownloadWaitFileReference});
|
||||
node->download_id_ = id;
|
||||
QueryId query_id = queries_container_.create(Query{file_id, Query::Type::DownloadWaitFileReference});
|
||||
node->download_id_ = query_id;
|
||||
if (node->download_was_update_file_reference_) {
|
||||
on_error(id, Status::Error("Can't download file: have no valid file reference"));
|
||||
on_error(query_id, Status::Error("Can't download file: have no valid file reference"));
|
||||
return;
|
||||
}
|
||||
node->download_was_update_file_reference_ = true;
|
||||
|
||||
context_->repair_file_reference(
|
||||
file_id, PromiseCreator::lambda([id, actor_id = actor_id(this), file_id](Result<Unit> res) {
|
||||
file_id, PromiseCreator::lambda([query_id, actor_id = actor_id(this), file_id](Result<Unit> res) {
|
||||
Status error;
|
||||
if (res.is_ok()) {
|
||||
error = Status::Error("FILE_DOWNLOAD_RESTART_WITH_FILE_REFERENCE");
|
||||
@ -2460,13 +2460,13 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
|
||||
error = res.move_as_error();
|
||||
}
|
||||
VLOG(file_references) << "Got result from FileSourceManager for file " << file_id << ": " << error;
|
||||
send_closure(actor_id, &FileManager::on_error, id, std::move(error));
|
||||
send_closure(actor_id, &FileManager::on_error, query_id, std::move(error));
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Type::Download});
|
||||
node->download_id_ = id;
|
||||
QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Download});
|
||||
node->download_id_ = query_id;
|
||||
node->is_download_started_ = false;
|
||||
LOG(INFO) << "Run download of file " << file_id << " of size " << node->size_ << " from "
|
||||
<< node->remote_.full.value() << " with suggested name " << node->suggested_path() << " and encyption key "
|
||||
@ -2479,7 +2479,7 @@ void FileManager::run_download(FileNodePtr node, bool force_update_priority) {
|
||||
download_limit += download_offset;
|
||||
download_offset = 0;
|
||||
}
|
||||
send_closure(file_load_manager_, &FileLoadManager::download, id, node->remote_.full.value(), node->local_,
|
||||
send_closure(file_load_manager_, &FileLoadManager::download, query_id, node->remote_.full.value(), node->local_,
|
||||
node->size_, node->suggested_path(), node->encryption_key_, node->can_search_locally_, download_offset,
|
||||
download_limit, priority);
|
||||
}
|
||||
@ -2776,20 +2776,20 @@ void FileManager::delete_file_reference(FileId file_id, Slice file_reference) {
|
||||
try_flush_node_pmc(node, "delete_file_reference");
|
||||
}
|
||||
|
||||
void FileManager::external_file_generate_write_part(int64 id, int64 offset, string data, Promise<> promise) {
|
||||
send_closure(file_generate_manager_, &FileGenerateManager::external_file_generate_write_part, id, offset,
|
||||
std::move(data), std::move(promise));
|
||||
void FileManager::external_file_generate_write_part(int64 generation_id, int64 offset, string data, Promise<> promise) {
|
||||
send_closure(file_generate_manager_, &FileGenerateManager::external_file_generate_write_part,
|
||||
static_cast<uint64>(generation_id), offset, std::move(data), std::move(promise));
|
||||
}
|
||||
|
||||
void FileManager::external_file_generate_progress(int64 id, int64 expected_size, int64 local_prefix_size,
|
||||
void FileManager::external_file_generate_progress(int64 generation_id, int64 expected_size, int64 local_prefix_size,
|
||||
Promise<> promise) {
|
||||
send_closure(file_generate_manager_, &FileGenerateManager::external_file_generate_progress, id, expected_size,
|
||||
local_prefix_size, std::move(promise));
|
||||
send_closure(file_generate_manager_, &FileGenerateManager::external_file_generate_progress,
|
||||
static_cast<uint64>(generation_id), expected_size, local_prefix_size, std::move(promise));
|
||||
}
|
||||
|
||||
void FileManager::external_file_generate_finish(int64 id, Status status, Promise<> promise) {
|
||||
send_closure(file_generate_manager_, &FileGenerateManager::external_file_generate_finish, id, std::move(status),
|
||||
std::move(promise));
|
||||
void FileManager::external_file_generate_finish(int64 generation_id, Status status, Promise<> promise) {
|
||||
send_closure(file_generate_manager_, &FileGenerateManager::external_file_generate_finish,
|
||||
static_cast<uint64>(generation_id), std::move(status), std::move(promise));
|
||||
}
|
||||
|
||||
void FileManager::run_generate(FileNodePtr node) {
|
||||
@ -2846,20 +2846,20 @@ void FileManager::run_generate(FileNodePtr node) {
|
||||
return;
|
||||
}
|
||||
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Type::Generate});
|
||||
node->generate_id_ = id;
|
||||
send_closure(file_generate_manager_, &FileGenerateManager::generate_file, id, *node->generate_, node->local_,
|
||||
node->suggested_path(), [file_manager = this, id] {
|
||||
QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Generate});
|
||||
node->generate_id_ = query_id;
|
||||
send_closure(
|
||||
file_generate_manager_, &FileGenerateManager::generate_file, query_id, *node->generate_, node->local_,
|
||||
node->suggested_path(), [file_manager = this, query_id] {
|
||||
class Callback final : public FileGenerateCallback {
|
||||
ActorId<FileManager> actor_;
|
||||
uint64 query_id_;
|
||||
|
||||
public:
|
||||
Callback(ActorId<FileManager> actor, QueryId id) : actor_(std::move(actor)), query_id_(id) {
|
||||
Callback(ActorId<FileManager> actor, QueryId query_id) : actor_(std::move(actor)), query_id_(query_id) {
|
||||
}
|
||||
void on_partial_generate(PartialLocalFileLocation partial_local, int64 expected_size) final {
|
||||
send_closure(actor_, &FileManager::on_partial_generate, query_id_, std::move(partial_local),
|
||||
expected_size);
|
||||
send_closure(actor_, &FileManager::on_partial_generate, query_id_, std::move(partial_local), expected_size);
|
||||
}
|
||||
void on_ok(FullLocalFileLocation local) final {
|
||||
send_closure(actor_, &FileManager::on_generate_ok, query_id_, std::move(local));
|
||||
@ -2868,7 +2868,7 @@ void FileManager::run_generate(FileNodePtr node) {
|
||||
send_closure(actor_, &FileManager::on_error, query_id_, std::move(error));
|
||||
}
|
||||
};
|
||||
return make_unique<Callback>(file_manager->actor_id(file_manager), id);
|
||||
return make_unique<Callback>(file_manager->actor_id(file_manager), query_id);
|
||||
}());
|
||||
|
||||
LOG(INFO) << "File " << file_id << " generate request has sent to FileGenerateManager";
|
||||
@ -2948,27 +2948,28 @@ void FileManager::run_upload(FileNodePtr node, vector<int> bad_parts) {
|
||||
CHECK(node->upload_id_ == 0);
|
||||
if (file_view.has_alive_remote_location() && !file_view.has_active_upload_remote_location() &&
|
||||
can_reuse_remote_file(file_view.get_type())) {
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Type::UploadWaitFileReference});
|
||||
node->upload_id_ = id;
|
||||
QueryId query_id = queries_container_.create(Query{file_id, Query::Type::UploadWaitFileReference});
|
||||
node->upload_id_ = query_id;
|
||||
if (node->upload_was_update_file_reference_) {
|
||||
on_error(id, Status::Error("Can't upload file: have no valid file reference"));
|
||||
on_error(query_id, Status::Error("Can't upload file: have no valid file reference"));
|
||||
return;
|
||||
}
|
||||
node->upload_was_update_file_reference_ = true;
|
||||
|
||||
context_->repair_file_reference(
|
||||
node->main_file_id_, PromiseCreator::lambda([id, actor_id = actor_id(this)](Result<Unit> res) {
|
||||
send_closure(actor_id, &FileManager::on_error, id, Status::Error("FILE_UPLOAD_RESTART_WITH_FILE_REFERENCE"));
|
||||
context_->repair_file_reference(node->main_file_id_,
|
||||
PromiseCreator::lambda([query_id, actor_id = actor_id(this)](Result<Unit> res) {
|
||||
send_closure(actor_id, &FileManager::on_error, query_id,
|
||||
Status::Error("FILE_UPLOAD_RESTART_WITH_FILE_REFERENCE"));
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!node->remote_.partial && node->get_by_hash_) {
|
||||
LOG(INFO) << "Get file " << node->main_file_id_ << " by hash";
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Type::UploadByHash});
|
||||
node->upload_id_ = id;
|
||||
QueryId query_id = queries_container_.create(Query{file_id, Query::Type::UploadByHash});
|
||||
node->upload_id_ = query_id;
|
||||
|
||||
send_closure(file_load_manager_, &FileLoadManager::upload_by_hash, id, node->local_.full(), node->size_,
|
||||
send_closure(file_load_manager_, &FileLoadManager::upload_by_hash, query_id, node->local_.full(), node->size_,
|
||||
narrow_cast<int8>(-priority));
|
||||
return;
|
||||
}
|
||||
@ -2981,9 +2982,9 @@ void FileManager::run_upload(FileNodePtr node, vector<int> bad_parts) {
|
||||
expected_size = 10 << 20;
|
||||
}
|
||||
|
||||
QueryId id = queries_container_.create(Query{file_id, Query::Type::Upload});
|
||||
node->upload_id_ = id;
|
||||
send_closure(file_load_manager_, &FileLoadManager::upload, id, node->local_, node->remote_.partial_or_empty(),
|
||||
QueryId query_id = queries_container_.create(Query{file_id, Query::Type::Upload});
|
||||
node->upload_id_ = query_id;
|
||||
send_closure(file_load_manager_, &FileLoadManager::upload, query_id, node->local_, node->remote_.partial_or_empty(),
|
||||
expected_size, node->encryption_key_, new_priority, std::move(bad_parts));
|
||||
|
||||
LOG(INFO) << "File " << file_id << " upload request has sent to FileLoadManager";
|
||||
@ -4078,9 +4079,9 @@ void FileManager::hangup() {
|
||||
file_generate_manager_.reset();
|
||||
file_load_manager_.reset();
|
||||
while (!queries_container_.empty()) {
|
||||
auto ids = queries_container_.ids();
|
||||
for (auto id : ids) {
|
||||
on_error(id, Global::request_aborted_error());
|
||||
auto query_ids = queries_container_.ids();
|
||||
for (auto query_id : query_ids) {
|
||||
on_error(query_id, Global::request_aborted_error());
|
||||
}
|
||||
}
|
||||
is_closed_ = true;
|
||||
|
@ -470,9 +470,10 @@ class FileManager final : public FileLoadManager::Callback {
|
||||
|
||||
void delete_file(FileId file_id, Promise<Unit> promise, const char *source);
|
||||
|
||||
void external_file_generate_write_part(int64 id, int64 offset, string data, Promise<> promise);
|
||||
void external_file_generate_progress(int64 id, int64 expected_size, int64 local_prefix_size, Promise<> promise);
|
||||
void external_file_generate_finish(int64 id, Status status, Promise<> promise);
|
||||
void external_file_generate_write_part(int64 generation_id, int64 offset, string data, Promise<> promise);
|
||||
void external_file_generate_progress(int64 generation_id, int64 expected_size, int64 local_prefix_size,
|
||||
Promise<> promise);
|
||||
void external_file_generate_finish(int64 generation_id, Status status, Promise<> promise);
|
||||
|
||||
Result<FileId> from_persistent_id(CSlice persistent_id, FileType file_type) TD_WARN_UNUSED_RESULT;
|
||||
FileView get_file_view(FileId file_id) const;
|
||||
@ -659,7 +660,6 @@ class FileManager final : public FileLoadManager::Callback {
|
||||
|
||||
void on_force_reupload_success(FileId file_id);
|
||||
|
||||
// void release_file_node(FileNodeId id);
|
||||
void do_cancel_download(FileNodePtr node);
|
||||
void do_cancel_upload(FileNodePtr node);
|
||||
void do_cancel_generate(FileNodePtr node);
|
||||
|
@ -46,17 +46,17 @@ int32 PartsManager::set_streaming_offset(int64 offset, int64 limit) {
|
||||
return finish();
|
||||
}
|
||||
|
||||
auto part_i = offset / part_size_;
|
||||
if (use_part_count_limit_ && part_i >= MAX_PART_COUNT_PREMIUM) {
|
||||
auto part_id = offset / part_size_;
|
||||
if (use_part_count_limit_ && part_id >= MAX_PART_COUNT_PREMIUM) {
|
||||
streaming_offset_ = 0;
|
||||
LOG(ERROR) << "Ignore streaming_offset " << offset << " in part " << part_i;
|
||||
LOG(ERROR) << "Ignore streaming_offset " << offset << " in part " << part_id;
|
||||
|
||||
return finish();
|
||||
}
|
||||
|
||||
streaming_offset_ = offset;
|
||||
first_streaming_empty_part_ = narrow_cast<int>(part_i);
|
||||
first_streaming_not_ready_part_ = narrow_cast<int>(part_i);
|
||||
first_streaming_empty_part_ = narrow_cast<int>(part_id);
|
||||
first_streaming_not_ready_part_ = narrow_cast<int>(part_id);
|
||||
if (part_count_ < first_streaming_empty_part_) {
|
||||
part_count_ = first_streaming_empty_part_;
|
||||
part_status_.resize(part_count_, PartStatus::Empty);
|
||||
@ -75,9 +75,9 @@ void PartsManager::set_streaming_limit(int64 limit) {
|
||||
if (streaming_limit_ == 0) {
|
||||
return;
|
||||
}
|
||||
for (int part_i = 0; part_i < part_count_; part_i++) {
|
||||
if (is_part_in_streaming_limit(part_i) && part_status_[part_i] == PartStatus::Ready) {
|
||||
streaming_ready_size_ += get_part(part_i).size;
|
||||
for (int part_id = 0; part_id < part_count_; part_id++) {
|
||||
if (is_part_in_streaming_limit(part_id) && part_status_[part_id] == PartStatus::Ready) {
|
||||
streaming_ready_size_ += get_part(part_id).size;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -231,10 +231,10 @@ string PartsManager::get_bitmask() {
|
||||
return bitmask_.encode(prefix_count);
|
||||
}
|
||||
|
||||
bool PartsManager::is_part_in_streaming_limit(int part_i) const {
|
||||
CHECK(part_i < part_count_);
|
||||
auto offset_begin = static_cast<int64>(part_i) * static_cast<int64>(get_part_size());
|
||||
auto offset_end = offset_begin + static_cast<int64>(get_part(part_i).size);
|
||||
bool PartsManager::is_part_in_streaming_limit(int part_id) const {
|
||||
CHECK(part_id < part_count_);
|
||||
auto offset_begin = static_cast<int64>(part_id) * static_cast<int64>(get_part_size());
|
||||
auto offset_end = offset_begin + static_cast<int64>(get_part(part_id).size);
|
||||
|
||||
if (offset_begin >= get_expected_size()) {
|
||||
return false;
|
||||
@ -265,25 +265,25 @@ bool PartsManager::is_streaming_limit_reached() {
|
||||
return false;
|
||||
}
|
||||
update_first_not_ready_part();
|
||||
auto part_i = first_streaming_not_ready_part_;
|
||||
auto part_id = first_streaming_not_ready_part_;
|
||||
|
||||
// wrap
|
||||
if (!unknown_size_flag_ && part_i == part_count_) {
|
||||
part_i = first_not_ready_part_;
|
||||
if (!unknown_size_flag_ && part_id == part_count_) {
|
||||
part_id = first_not_ready_part_;
|
||||
}
|
||||
if (part_i == part_count_) {
|
||||
if (part_id == part_count_) {
|
||||
return false;
|
||||
}
|
||||
return !is_part_in_streaming_limit(part_i);
|
||||
return !is_part_in_streaming_limit(part_id);
|
||||
}
|
||||
|
||||
Result<Part> PartsManager::start_part() {
|
||||
update_first_empty_part();
|
||||
auto part_i = first_streaming_empty_part_;
|
||||
if (known_prefix_flag_ && part_i >= static_cast<int>(known_prefix_size_ / part_size_)) {
|
||||
auto part_id = first_streaming_empty_part_;
|
||||
if (known_prefix_flag_ && part_id >= static_cast<int>(known_prefix_size_ / part_size_)) {
|
||||
return Status::Error(-1, "Wait for prefix to be known");
|
||||
}
|
||||
if (part_i == part_count_) {
|
||||
if (part_id == part_count_) {
|
||||
if (unknown_size_flag_) {
|
||||
part_count_++;
|
||||
if (part_count_ > MAX_PART_COUNT_PREMIUM + (use_part_count_limit_ ? 0 : 64)) {
|
||||
@ -296,19 +296,19 @@ Result<Part> PartsManager::start_part() {
|
||||
part_status_.push_back(PartStatus::Empty);
|
||||
} else {
|
||||
if (first_empty_part_ < part_count_) {
|
||||
part_i = first_empty_part_;
|
||||
part_id = first_empty_part_;
|
||||
} else {
|
||||
return get_empty_part();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!is_part_in_streaming_limit(part_i)) {
|
||||
if (!is_part_in_streaming_limit(part_id)) {
|
||||
return get_empty_part();
|
||||
}
|
||||
CHECK(part_status_[part_i] == PartStatus::Empty);
|
||||
on_part_start(part_i);
|
||||
return get_part(part_i);
|
||||
CHECK(part_status_[part_id] == PartStatus::Empty);
|
||||
on_part_start(part_id);
|
||||
return get_part(part_id);
|
||||
}
|
||||
|
||||
Status PartsManager::set_known_prefix(size_t size, bool is_ready) {
|
||||
@ -341,22 +341,23 @@ Status PartsManager::set_known_prefix(size_t size, bool is_ready) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PartsManager::on_part_ok(int32 id, size_t part_size, size_t actual_size) {
|
||||
CHECK(part_status_[id] == PartStatus::Pending);
|
||||
Status PartsManager::on_part_ok(int part_id, size_t part_size, size_t actual_size) {
|
||||
CHECK(part_status_[part_id] == PartStatus::Pending);
|
||||
pending_count_--;
|
||||
|
||||
part_status_[id] = PartStatus::Ready;
|
||||
part_status_[part_id] = PartStatus::Ready;
|
||||
if (actual_size != 0) {
|
||||
bitmask_.set(id);
|
||||
bitmask_.set(part_id);
|
||||
}
|
||||
ready_size_ += narrow_cast<int64>(actual_size);
|
||||
if (streaming_limit_ > 0 && is_part_in_streaming_limit(id)) {
|
||||
if (streaming_limit_ > 0 && is_part_in_streaming_limit(part_id)) {
|
||||
streaming_ready_size_ += narrow_cast<int64>(actual_size);
|
||||
}
|
||||
|
||||
VLOG(file_loader) << "Transferred part " << id << " of size " << part_size << ", total ready size = " << ready_size_;
|
||||
VLOG(file_loader) << "Transferred part " << part_id << " of size " << part_size
|
||||
<< ", total ready size = " << ready_size_;
|
||||
|
||||
int64 offset = narrow_cast<int64>(part_size_) * id;
|
||||
int64 offset = narrow_cast<int64>(part_size_) * part_id;
|
||||
int64 end_offset = offset + narrow_cast<int64>(actual_size);
|
||||
if (unknown_size_flag_) {
|
||||
CHECK(part_size == part_size_);
|
||||
@ -386,20 +387,20 @@ Status PartsManager::on_part_ok(int32 id, size_t part_size, size_t actual_size)
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void PartsManager::on_part_failed(int32 id) {
|
||||
CHECK(part_status_[id] == PartStatus::Pending);
|
||||
void PartsManager::on_part_failed(int32 part_id) {
|
||||
CHECK(part_status_[part_id] == PartStatus::Pending);
|
||||
pending_count_--;
|
||||
part_status_[id] = PartStatus::Empty;
|
||||
if (id < first_empty_part_) {
|
||||
first_empty_part_ = id;
|
||||
part_status_[part_id] = PartStatus::Empty;
|
||||
if (part_id < first_empty_part_) {
|
||||
first_empty_part_ = part_id;
|
||||
}
|
||||
if (streaming_offset_ == 0) {
|
||||
first_streaming_empty_part_ = id;
|
||||
first_streaming_empty_part_ = part_id;
|
||||
return;
|
||||
}
|
||||
auto part_i = narrow_cast<int>(streaming_offset_ / part_size_);
|
||||
if (id >= part_i && id < first_streaming_empty_part_) {
|
||||
first_streaming_empty_part_ = id;
|
||||
auto offset_part_id = narrow_cast<int>(streaming_offset_ / part_size_);
|
||||
if (part_id >= offset_part_id && part_id < first_streaming_empty_part_) {
|
||||
first_streaming_empty_part_ = part_id;
|
||||
}
|
||||
}
|
||||
|
||||
@ -442,9 +443,9 @@ int64 PartsManager::get_estimated_extra() const {
|
||||
|
||||
//TODO: delete this block if CHECK won't fail
|
||||
int64 sub = 0;
|
||||
for (int part_i = 0; part_i < part_count_; part_i++) {
|
||||
if (is_part_in_streaming_limit(part_i) && part_status_[part_i] == PartStatus::Ready) {
|
||||
sub += get_part(part_i).size;
|
||||
for (int part_id = 0; part_id < part_count_; part_id++) {
|
||||
if (is_part_in_streaming_limit(part_id) && part_status_[part_id] == PartStatus::Ready) {
|
||||
sub += get_part(part_id).size;
|
||||
}
|
||||
}
|
||||
CHECK(sub == streaming_ready_size_);
|
||||
@ -536,25 +537,25 @@ int64 PartsManager::get_unchecked_ready_prefix_size() {
|
||||
return res;
|
||||
}
|
||||
|
||||
Part PartsManager::get_part(int id) const {
|
||||
Part PartsManager::get_part(int part_id) const {
|
||||
auto size = narrow_cast<int64>(part_size_);
|
||||
auto offset = size * id;
|
||||
auto offset = size * part_id;
|
||||
auto total_size = unknown_size_flag_ ? max_size_ : get_size();
|
||||
if (total_size < offset) {
|
||||
size = 0;
|
||||
} else {
|
||||
size = min(size, total_size - offset);
|
||||
}
|
||||
return Part{id, offset, static_cast<size_t>(size)};
|
||||
return Part{part_id, offset, static_cast<size_t>(size)};
|
||||
}
|
||||
|
||||
Part PartsManager::get_empty_part() {
|
||||
return Part{-1, 0, 0};
|
||||
}
|
||||
|
||||
void PartsManager::on_part_start(int32 id) {
|
||||
CHECK(part_status_[id] == PartStatus::Empty);
|
||||
part_status_[id] = PartStatus::Pending;
|
||||
void PartsManager::on_part_start(int32 part_id) {
|
||||
CHECK(part_status_[part_id] == PartStatus::Empty);
|
||||
part_status_[part_id] = PartStatus::Pending;
|
||||
pending_count_++;
|
||||
}
|
||||
|
||||
|
@ -30,8 +30,8 @@ class PartsManager {
|
||||
|
||||
// returns empty part if nothing to return
|
||||
Result<Part> start_part() TD_WARN_UNUSED_RESULT;
|
||||
Status on_part_ok(int32 id, size_t part_size, size_t actual_size) TD_WARN_UNUSED_RESULT;
|
||||
void on_part_failed(int32 id);
|
||||
Status on_part_ok(int part_id, size_t part_size, size_t actual_size) TD_WARN_UNUSED_RESULT;
|
||||
void on_part_failed(int part_id);
|
||||
Status set_known_prefix(size_t size, bool is_ready);
|
||||
void set_need_check();
|
||||
void set_checked_prefix_size(int64 size);
|
||||
@ -96,13 +96,13 @@ class PartsManager {
|
||||
|
||||
static Part get_empty_part();
|
||||
|
||||
Part get_part(int id) const;
|
||||
void on_part_start(int32 id);
|
||||
Part get_part(int part_id) const;
|
||||
void on_part_start(int part_id);
|
||||
void update_first_empty_part();
|
||||
void update_first_not_ready_part();
|
||||
|
||||
bool is_streaming_limit_reached();
|
||||
bool is_part_in_streaming_limit(int part_i) const;
|
||||
bool is_part_in_streaming_limit(int part_id) const;
|
||||
};
|
||||
|
||||
} // namespace td
|
||||
|
Loading…
Reference in New Issue
Block a user