Move directory creation and database check to another thread.

This commit is contained in:
levlam 2022-06-21 02:02:48 +03:00
parent 1528cfb9da
commit ee926f86a3
5 changed files with 129 additions and 66 deletions

View File

@ -144,7 +144,6 @@
#include "td/utils/misc.h"
#include "td/utils/PathView.h"
#include "td/utils/port/IPAddress.h"
#include "td/utils/port/path.h"
#include "td/utils/port/SocketFd.h"
#include "td/utils/port/uname.h"
#include "td/utils/Random.h"
@ -3042,6 +3041,10 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
}
void Td::run_request(uint64 id, tl_object_ptr<td_api::Function> function) {
if (set_parameters_request_id_ > 0) {
pending_set_parameters_requests_.emplace_back(id, std::move(function));
return;
}
if (init_request_id_ > 0) {
pending_init_requests_.emplace_back(id, std::move(function));
return;
@ -3073,9 +3076,20 @@ void Td::run_request(uint64 id, tl_object_ptr<td_api::Function> function) {
switch (state_) {
case State::WaitParameters: {
switch (function_id) {
case td_api::setTdlibParameters::ID:
return answer_ok_query(
id, set_parameters(std::move(move_tl_object_as<td_api::setTdlibParameters>(function)->parameters_)));
case td_api::setTdlibParameters::ID: {
auto status = set_parameters(std::move(move_tl_object_as<td_api::setTdlibParameters>(function)->parameters_));
if (status.is_error()) {
return send_closure(actor_id(this), &Td::send_error, id, std::move(status));
}
VLOG(td_init) << "Begin to check parameters";
set_parameters_request_id_ = id;
auto promise =
PromiseCreator::lambda([actor_id = actor_id(this)](Result<TdDb::CheckedParameters> r_checked_parameters) {
send_closure(actor_id, &Td::on_parameters_checked, std::move(r_checked_parameters));
});
return TdDb::check_parameters(get_database_scheduler_id(), parameters_, std::move(promise));
}
default:
if (is_preinitialization_request(function_id)) {
break;
@ -3653,6 +3667,45 @@ int32 Td::get_database_scheduler_id() {
return min(current_scheduler_id + 1, scheduler_count - 1);
}
void Td::on_parameters_checked(Result<TdDb::CheckedParameters> r_checked_parameters) {
CHECK(set_parameters_request_id_ != 0);
if (r_checked_parameters.is_error()) {
send_closure(actor_id(this), &Td::send_error, set_parameters_request_id_,
Status::Error(400, r_checked_parameters.error().message()));
return finish_set_parameters();
}
auto checked_parameters = r_checked_parameters.move_as_ok();
parameters_.database_directory = std::move(checked_parameters.database_directory);
parameters_.files_directory = std::move(checked_parameters.files_directory);
is_database_encrypted_ = checked_parameters.is_database_encrypted;
state_ = State::Decrypt;
VLOG(td_init) << "Send authorizationStateWaitEncryptionKey";
send_closure(actor_id(this), &Td::send_update,
td_api::make_object<td_api::updateAuthorizationState>(
td_api::make_object<td_api::authorizationStateWaitEncryptionKey>(is_database_encrypted_)));
VLOG(td_init) << "Finish set parameters";
send_closure(actor_id(this), &Td::send_result, set_parameters_request_id_, td_api::make_object<td_api::ok>());
return finish_set_parameters();
}
void Td::finish_set_parameters() {
CHECK(set_parameters_request_id_ != 0);
set_parameters_request_id_ = 0;
if (pending_set_parameters_requests_.empty()) {
return;
}
VLOG(td_init) << "Continue to execute " << pending_set_parameters_requests_.size() << " pending requests";
auto requests = std::move(pending_set_parameters_requests_);
for (auto &request : requests) {
run_request(request.first, std::move(request.second));
}
CHECK(pending_set_parameters_requests_.size() < requests.size());
}
void Td::start_init(uint64 id, string &&key) {
VLOG(td_init) << "Begin to init database";
init_request_id_ = id;
@ -4213,35 +4266,6 @@ Status Td::fix_parameters(TdParameters &parameters) {
VLOG(td_init) << "Invalid api_hash";
return Status::Error(400, "Valid api_hash must be provided. Can be obtained at https://my.telegram.org");
}
auto prepare_dir = [](string dir) -> Result<string> {
CHECK(!dir.empty());
if (dir.back() != TD_DIR_SLASH) {
dir += TD_DIR_SLASH;
}
TRY_STATUS(mkpath(dir, 0750));
TRY_RESULT(real_dir, realpath(dir, true));
if (dir.back() != TD_DIR_SLASH) {
dir += TD_DIR_SLASH;
}
return real_dir;
};
auto r_database_directory = prepare_dir(parameters.database_directory);
if (r_database_directory.is_error()) {
VLOG(td_init) << "Invalid database_directory";
return Status::Error(400, PSLICE() << "Can't init database in the directory \"" << parameters.database_directory
<< "\": " << r_database_directory.error());
}
parameters.database_directory = r_database_directory.move_as_ok();
auto r_files_directory = prepare_dir(parameters.files_directory);
if (r_files_directory.is_error()) {
VLOG(td_init) << "Invalid files_directory";
return Status::Error(400, PSLICE() << "Can't init files directory \"" << parameters.files_directory
<< "\": " << r_files_directory.error());
}
parameters.files_directory = r_files_directory.move_as_ok();
return Status::OK();
}
@ -4252,8 +4276,8 @@ Status Td::set_parameters(td_api::object_ptr<td_api::tdlibParameters> parameters
return Status::Error(400, "Parameters aren't specified");
}
if (!clean_input_string(parameters->api_hash_) && !clean_input_string(parameters->system_language_code_) &&
!clean_input_string(parameters->device_model_) && !clean_input_string(parameters->system_version_) &&
if (!clean_input_string(parameters->api_hash_) || !clean_input_string(parameters->system_language_code_) ||
!clean_input_string(parameters->device_model_) || !clean_input_string(parameters->system_version_) ||
!clean_input_string(parameters->application_version_)) {
VLOG(td_init) << "Wrong string encoding";
return Status::Error(400, "Strings must be encoded in UTF-8");
@ -4271,11 +4295,7 @@ Status Td::set_parameters(td_api::object_ptr<td_api::tdlibParameters> parameters
parameters_.use_chat_info_db = parameters->use_chat_info_database_;
parameters_.use_message_db = parameters->use_message_database_;
VLOG(td_init) << "Fix parameters...";
TRY_STATUS(fix_parameters(parameters_));
VLOG(td_init) << "Check binlog encryption...";
TRY_RESULT(encryption_info, TdDb::check_encryption(parameters_));
is_database_encrypted_ = encryption_info.is_encrypted;
VLOG(td_init) << "Create MtprotoHeader::Options";
options_.api_id = parameters->api_id_;
@ -4306,12 +4326,6 @@ Status Td::set_parameters(td_api::object_ptr<td_api::tdlibParameters> parameters
options_.is_emulator = false;
options_.proxy = Proxy();
state_ = State::Decrypt;
VLOG(td_init) << "Send authorizationStateWaitEncryptionKey";
send_closure(actor_id(this), &Td::send_update,
td_api::make_object<td_api::updateAuthorizationState>(
td_api::make_object<td_api::authorizationStateWaitEncryptionKey>(is_database_encrypted_)));
VLOG(td_init) << "Finish set parameters";
return Status::OK();
}

View File

@ -296,6 +296,7 @@ class Td final : public Actor {
enum class State : int32 { WaitParameters, Decrypt, Run, Close } state_ = State::WaitParameters;
uint64 init_request_id_ = 0;
uint64 set_parameters_request_id_ = 0;
bool is_database_encrypted_ = false;
FlatHashMap<uint64, std::shared_ptr<ResultHandler>> result_handlers_;
@ -321,6 +322,7 @@ class Td final : public Actor {
vector<std::pair<uint64, td_api::object_ptr<td_api::Function>>> pending_preauthentication_requests_;
vector<std::pair<uint64, td_api::object_ptr<td_api::Function>>> pending_set_parameters_requests_;
vector<std::pair<uint64, td_api::object_ptr<td_api::Function>>> pending_init_requests_;
template <class T>
@ -1428,6 +1430,10 @@ class Td final : public Actor {
static int32 get_database_scheduler_id();
void on_parameters_checked(Result<TdDb::CheckedParameters> r_checked_parameters);
void finish_set_parameters();
void start_init(uint64 id, string &&key);
void init(Result<TdDb::OpenedDatabase> r_opened_database);

View File

@ -51,19 +51,6 @@ std::string get_sqlite_path(const TdParameters &parameters) {
return parameters.database_directory + db_name + ".sqlite";
}
Result<TdDb::EncryptionInfo> check_encryption(string path) {
Binlog binlog;
auto status = binlog.init(std::move(path), Binlog::Callback());
if (status.is_error() && status.code() != Binlog::Error::WrongPassword) {
LOG(WARNING) << "Failed to check binlog: " << status;
return Status::Error(400, status.message());
}
TdDb::EncryptionInfo info;
info.is_encrypted = binlog.get_info().wrong_password;
binlog.close(false /*need_sync*/).ensure();
return info;
}
Status init_binlog(Binlog &binlog, string path, BinlogKeyValue<Binlog> &binlog_pmc, BinlogKeyValue<Binlog> &config_pmc,
TdDb::OpenedDatabase &events, DbKey key) {
auto callback = [&](const BinlogEvent &event) {
@ -495,8 +482,62 @@ void TdDb::open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<
TdDb::TdDb() = default;
TdDb::~TdDb() = default;
Result<TdDb::EncryptionInfo> TdDb::check_encryption(const TdParameters &parameters) {
return ::td::check_encryption(get_binlog_path(parameters));
void TdDb::check_parameters(int32 scheduler_id, TdParameters parameters, Promise<CheckedParameters> promise) {
if (scheduler_id >= 0 && Scheduler::instance()->sched_id() != scheduler_id) {
class Worker final : public Actor {
public:
void run(TdParameters parameters, Promise<CheckedParameters> promise) {
TdDb::check_parameters(-1, std::move(parameters), std::move(promise));
stop();
}
};
send_closure(create_actor_on_scheduler<Worker>("Worker", scheduler_id), &Worker::run, std::move(parameters),
std::move(promise));
return;
}
CheckedParameters result;
auto prepare_dir = [](string dir) -> Result<string> {
CHECK(!dir.empty());
if (dir.back() != TD_DIR_SLASH) {
dir += TD_DIR_SLASH;
}
TRY_STATUS(mkpath(dir, 0750));
TRY_RESULT(real_dir, realpath(dir, true));
if (dir.back() != TD_DIR_SLASH) {
dir += TD_DIR_SLASH;
}
return real_dir;
};
auto r_database_directory = prepare_dir(parameters.database_directory);
if (r_database_directory.is_error()) {
VLOG(td_init) << "Invalid database_directory";
return promise.set_error(Status::Error(PSLICE()
<< "Can't init database in the directory \"" << parameters.database_directory
<< "\": " << r_database_directory.error()));
}
result.database_directory = r_database_directory.move_as_ok();
parameters.database_directory = result.database_directory;
auto r_files_directory = prepare_dir(parameters.files_directory);
if (r_files_directory.is_error()) {
VLOG(td_init) << "Invalid files_directory";
return promise.set_error(Status::Error(PSLICE() << "Can't init files directory \"" << parameters.files_directory
<< "\": " << r_files_directory.error()));
}
result.files_directory = r_files_directory.move_as_ok();
Binlog binlog;
auto status = binlog.init(get_binlog_path(parameters), Binlog::Callback());
if (status.is_error() && status.code() != Binlog::Error::WrongPassword) {
LOG(WARNING) << "Failed to check binlog: " << status;
return promise.set_error(std::move(status));
}
result.is_database_encrypted = binlog.get_info().wrong_password;
binlog.close(false /*need_sync*/).ensure();
promise.set_value(std::move(result));
}
void TdDb::change_key(DbKey key, Promise<> promise) {

View File

@ -48,6 +48,13 @@ class TdDb {
TdDb &operator=(TdDb &&) = delete;
~TdDb();
struct CheckedParameters {
string database_directory;
string files_directory;
bool is_database_encrypted{false};
};
static void check_parameters(int32 scheduler_id, TdParameters parameters, Promise<CheckedParameters> promise);
struct OpenedDatabase {
unique_ptr<TdDb> database;
@ -64,11 +71,6 @@ class TdDb {
};
static void open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise);
struct EncryptionInfo {
bool is_encrypted{false};
};
static Result<EncryptionInfo> check_encryption(const TdParameters &parameters);
static Status destroy(const TdParameters &parameters);
std::shared_ptr<FileDbInterface> get_file_db_shared();

View File

@ -440,7 +440,7 @@ class TestDownloadFile : public Task {
begin = end;
}
std::random_shuffle(as_mutable_span(ranges_), rnd);
random_shuffle(as_mutable_span(ranges_), rnd);
start_chunk();
}