Move read_file_part to FileLoadManager thread.

This commit is contained in:
levlam 2022-06-16 00:25:47 +03:00
parent 62a97d8eb6
commit ed7b8f3a19
3 changed files with 32 additions and 31 deletions

View File

@ -14,6 +14,7 @@
#include "td/utils/common.h"
#include "td/utils/filesystem.h"
#include "td/utils/format.h"
#include "td/utils/misc.h"
#include "td/utils/SliceBuilder.h"
namespace td {
@ -136,6 +137,10 @@ void FileLoadManager::get_content(string file_path, Promise<BufferSlice> promise
promise.set_result(read_file(file_path));
}
void FileLoadManager::read_file_part(string file_path, int64 offset, int64 count, Promise<string> promise) {
promise.set_result(read_file_str(file_path, count, offset));
}
// void upload_reload_parts(QueryId id, vector<int32> parts);
// void upload_restart(QueryId id);
void FileLoadManager::cancel(QueryId id) {

View File

@ -59,6 +59,8 @@ class FileLoadManager final : public Actor {
void get_content(string file_path, Promise<BufferSlice> promise);
void read_file_part(string file_path, int64 offset, int64 count, Promise<string> promise);
private:
struct Node {
QueryId query_id_;

View File

@ -2103,38 +2103,32 @@ void FileManager::read_file_part(FileId file_id, int64 offset, int64 count, int
is_partial = true;
}
// TODO move file reading to another thread
auto r_bytes = [&]() -> Result<string> {
TRY_RESULT(fd, FileFd::open(*path, FileFd::Read));
string data;
data.resize(narrow_cast<size_t>(count));
TRY_RESULT(read_bytes, fd.pread(data, offset));
if (read_bytes != static_cast<size_t>(count)) {
return Status::Error("Read less bytes than expected");
}
return std::move(data);
}();
if (r_bytes.is_error()) {
LOG(INFO) << "Failed to read file bytes: " << r_bytes.error();
if (--left_tries == 0 || !is_partial) {
return promise.set_error(Status::Error(400, "Failed to read the file"));
}
auto read_file_part_promise =
PromiseCreator::lambda([actor_id = actor_id(this), file_id, offset, count, left_tries, is_partial,
promise = std::move(promise)](Result<string> r_bytes) mutable {
if (r_bytes.is_error()) {
LOG(INFO) << "Failed to read file bytes: " << r_bytes.error();
if (left_tries == 1 || !is_partial) {
return promise.set_error(Status::Error(400, "Failed to read the file"));
}
// the temporary file could be moved from temp to persistent folder
// we need to wait for the corresponding update and repeat the reading
create_actor<SleepActor>("RepeatReadFilePartActor", 0.01,
PromiseCreator::lambda([actor_id = actor_id(this), file_id, offset, count, left_tries,
promise = std::move(promise)](Result<Unit> result) mutable {
send_closure(actor_id, &FileManager::read_file_part, file_id, offset, count, left_tries,
std::move(promise));
}))
.release();
return;
}
auto result = td_api::make_object<td_api::filePart>();
result->data_ = r_bytes.move_as_ok();
promise.set_value(std::move(result));
// the temporary file could be moved from temp to persistent directory
// we need to wait for the corresponding update and repeat the reading
create_actor<SleepActor>("RepeatReadFilePartActor", 0.01,
PromiseCreator::lambda([actor_id, file_id, offset, count, left_tries,
promise = std::move(promise)](Result<Unit> result) mutable {
send_closure(actor_id, &FileManager::read_file_part, file_id, offset, count,
left_tries - 1, std::move(promise));
}))
.release();
} else {
auto result = td_api::make_object<td_api::filePart>();
result->data_ = r_bytes.move_as_ok();
promise.set_value(std::move(result));
}
});
send_closure(file_load_manager_, &FileLoadManager::read_file_part, *path, offset, count,
std::move(read_file_part_promise));
}
void FileManager::delete_file(FileId file_id, Promise<Unit> promise, const char *source) {