Support asynchronous TdDb::open.

This commit is contained in:
levlam 2022-06-10 16:44:53 +03:00
parent ef3900a853
commit f95f76c4de
5 changed files with 96 additions and 35 deletions

View File

@ -3033,11 +3033,21 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
}
VLOG(td_requests) << "Receive request " << id << ": " << to_string(function);
int32 function_id = function->get_id();
if (is_synchronous_request(function_id)) {
if (is_synchronous_request(function->get_id())) {
// send response synchronously
return send_result(id, static_request(std::move(function)));
}
run_request(id, std::move(function));
}
void Td::run_request(uint64 id, tl_object_ptr<td_api::Function> function) {
if (init_request_id_ > 0) {
pending_init_requests_.emplace_back(id, std::move(function));
return;
}
int32 function_id = function->get_id();
if (state_ != State::Run) {
switch (function_id) {
case td_api::getAuthorizationState::ID:
@ -3083,11 +3093,11 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
switch (function_id) {
case td_api::checkDatabaseEncryptionKey::ID: {
auto check_key = move_tl_object_as<td_api::checkDatabaseEncryptionKey>(function);
return answer_ok_query(id, init(as_db_key(std::move(check_key->encryption_key_))));
return start_init(id, std::move(check_key->encryption_key_));
}
case td_api::setDatabaseEncryptionKey::ID: {
auto set_key = move_tl_object_as<td_api::setDatabaseEncryptionKey>(function);
return answer_ok_query(id, init(as_db_key(std::move(set_key->new_encryption_key_))));
return start_init(id, std::move(set_key->new_encryption_key_));
}
case td_api::destroy::ID:
// need to send response synchronously before actual destroying
@ -3635,22 +3645,34 @@ void Td::complete_pending_preauthentication_requests(const T &func) {
}
}
Status Td::init(DbKey key) {
void Td::start_init(uint64 id, string &&key) {
VLOG(td_init) << "Begin to init database";
init_request_id_ = id;
auto promise = PromiseCreator::lambda([actor_id = actor_id(this)](Result<TdDb::OpenedDatabase> r_opened_database) {
send_closure(actor_id, &Td::init, std::move(r_opened_database));
});
auto current_scheduler_id = Scheduler::instance()->sched_id();
auto scheduler_count = Scheduler::instance()->sched_count();
TdDb::open(min(current_scheduler_id + 1, scheduler_count - 1), parameters_, as_db_key(std::move(key)),
std::move(promise));
}
VLOG(td_init) << "Begin to init database";
TdDb::Events events;
auto r_td_db = TdDb::open(min(current_scheduler_id + 1, scheduler_count - 1), parameters_, std::move(key), events);
if (r_td_db.is_error()) {
LOG(WARNING) << "Failed to open database: " << r_td_db.error();
return Status::Error(400, r_td_db.error().message());
void Td::init(Result<TdDb::OpenedDatabase> r_opened_database) {
CHECK(init_request_id_ != 0);
if (r_opened_database.is_error()) {
LOG(WARNING) << "Failed to open database: " << r_opened_database.error();
send_closure(actor_id(this), &Td::send_error, init_request_id_,
Status::Error(400, r_opened_database.error().message()));
return finish_init();
}
LOG(INFO) << "Successfully inited database in " << tag("database_directory", parameters_.database_directory)
<< " and " << tag("files_directory", parameters_.files_directory);
VLOG(td_init) << "Successfully inited database";
G()->init(parameters_, actor_id(this), r_td_db.move_as_ok()).ensure();
auto events = r_opened_database.move_as_ok();
G()->init(parameters_, actor_id(this), std::move(events.database)).ensure();
init_options_and_network();
@ -3693,6 +3715,9 @@ Status Td::init(DbKey key) {
G()->set_my_id(G()->shared_config().get_option_integer("my_id"));
auto current_scheduler_id = Scheduler::instance()->sched_id();
auto scheduler_count = Scheduler::instance()->sched_count();
storage_manager_ = create_actor<StorageManager>("StorageManager", create_reference(),
min(current_scheduler_id + 2, scheduler_count - 1));
G()->set_storage_manager(storage_manager_.get());
@ -3774,7 +3799,24 @@ Status Td::init(DbKey key) {
VLOG(td_init) << "Finish initialization";
state_ = State::Run;
return Status::OK();
send_closure(actor_id(this), &Td::send_result, init_request_id_, td_api::make_object<td_api::ok>());
return finish_init();
}
void Td::finish_init() {
CHECK(init_request_id_ > 0);
init_request_id_ = 0;
if (pending_init_requests_.empty()) {
return;
}
auto requests = std::move(pending_init_requests_);
for (auto &request : requests) {
run_request(request.first, std::move(request.second));
}
CHECK(pending_init_requests_.size() < requests.size());
}
void Td::init_options_and_network() {

View File

@ -13,6 +13,7 @@
#include "td/telegram/net/NetQueryStats.h"
#include "td/telegram/td_api.h"
#include "td/telegram/TdCallback.h"
#include "td/telegram/TdDb.h"
#include "td/telegram/TdParameters.h"
#include "td/telegram/telegram_api.h"
#include "td/telegram/TermsOfService.h"
@ -255,6 +256,8 @@ class Td final : public Actor {
void on_connection_state_changed(ConnectionState new_state);
void run_request(uint64 id, tl_object_ptr<td_api::Function> function);
void send_result(uint64 id, tl_object_ptr<td_api::Object> object);
void send_error(uint64 id, Status error);
void send_error_impl(uint64 id, tl_object_ptr<td_api::error> error);
@ -291,6 +294,7 @@ class Td final : public Actor {
int close_flag_ = 0;
enum class State : int32 { WaitParameters, Decrypt, Run, Close } state_ = State::WaitParameters;
uint64 init_request_id_ = 0;
bool is_database_encrypted_ = false;
FlatHashMap<uint64, std::shared_ptr<ResultHandler>> result_handlers_;
@ -316,6 +320,8 @@ class Td final : public Actor {
vector<std::pair<uint64, td_api::object_ptr<td_api::Function>>> pending_preauthentication_requests_;
vector<std::pair<uint64, td_api::object_ptr<td_api::Function>>> pending_init_requests_;
template <class T>
void complete_pending_preauthentication_requests(const T &func);
@ -1394,14 +1400,27 @@ class Td final : public Actor {
static td_api::object_ptr<td_api::Object> do_static_request(td_api::testReturnError &request);
static DbKey as_db_key(string key);
Status init(DbKey key) TD_WARN_UNUSED_RESULT;
void start_init(uint64 id, string &&key);
void init(Result<TdDb::OpenedDatabase> r_opened_database);
void init_options_and_network();
void init_connection_creator();
void init_file_manager();
void init_managers();
void finish_init();
void clear();
void close_impl(bool destroy_flag);
static Status fix_parameters(TdParameters &parameters) TD_WARN_UNUSED_RESULT;
Status set_parameters(td_api::object_ptr<td_api::tdlibParameters> parameters) TD_WARN_UNUSED_RESULT;
static td_api::object_ptr<td_api::error> make_error(int32 code, CSlice error) {

View File

@ -64,7 +64,7 @@ Result<TdDb::EncryptionInfo> check_encryption(string path) {
}
Status init_binlog(Binlog &binlog, string path, BinlogKeyValue<Binlog> &binlog_pmc, BinlogKeyValue<Binlog> &config_pmc,
TdDb::Events &events, DbKey key) {
TdDb::OpenedDatabase &events, DbKey key) {
auto callback = [&](const BinlogEvent &event) {
switch (event.type_) {
case LogEvent::HandlerType::SecretChats:
@ -388,7 +388,9 @@ Status TdDb::init_sqlite(int32 scheduler_id, const TdParameters &parameters, con
return Status::OK();
}
Status TdDb::init(int32 scheduler_id, const TdParameters &parameters, DbKey key, Events &events) {
void TdDb::open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise) {
OpenedDatabase result;
// Init pmc
Binlog *binlog_ptr = nullptr;
auto binlog = std::shared_ptr<Binlog>(new Binlog, [&](Binlog *ptr) { binlog_ptr = ptr; });
@ -400,7 +402,8 @@ Status TdDb::init(int32 scheduler_id, const TdParameters &parameters, DbKey key,
bool encrypt_binlog = !key.is_empty();
VLOG(td_init) << "Start binlog loading";
TRY_STATUS(init_binlog(*binlog, get_binlog_path(parameters), *binlog_pmc, *config_pmc, events, std::move(key)));
TRY_STATUS_PROMISE(
promise, init_binlog(*binlog, get_binlog_path(parameters), *binlog_pmc, *config_pmc, result, std::move(key)));
VLOG(td_init) << "Finish binlog loading";
binlog_pmc->external_init_finish(binlog);
@ -428,15 +431,16 @@ Status TdDb::init(int32 scheduler_id, const TdParameters &parameters, DbKey key,
}
}
VLOG(td_init) << "Start to init database";
auto init_sqlite_status = init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc);
auto db = make_unique<TdDb>();
auto init_sqlite_status = db->init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc);
VLOG(td_init) << "Finish to init database";
if (init_sqlite_status.is_error()) {
LOG(ERROR) << "Destroy bad SQLite database because of " << init_sqlite_status;
if (sql_connection_ != nullptr) {
sql_connection_->get().close();
if (db->sql_connection_ != nullptr) {
db->sql_connection_->get().close();
}
SqliteDb::destroy(get_sqlite_path(parameters)).ignore();
TRY_STATUS(init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc));
TRY_STATUS_PROMISE(promise, db->init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc));
}
if (drop_sqlite_key) {
binlog_pmc->erase("sqlite_key");
@ -466,22 +470,18 @@ Status TdDb::init(int32 scheduler_id, const TdParameters &parameters, DbKey key,
VLOG(td_init) << "Init concurrent_config_pmc";
concurrent_config_pmc->external_init_finish(concurrent_binlog);
binlog_pmc_ = std::move(concurrent_binlog_pmc);
config_pmc_ = std::move(concurrent_config_pmc);
binlog_ = std::move(concurrent_binlog);
db->binlog_pmc_ = std::move(concurrent_binlog_pmc);
db->config_pmc_ = std::move(concurrent_config_pmc);
db->binlog_ = std::move(concurrent_binlog);
return Status::OK();
result.database = std::move(db);
promise.set_value(std::move(result));
}
TdDb::TdDb() = default;
TdDb::~TdDb() = default;
Result<unique_ptr<TdDb>> TdDb::open(int32 scheduler_id, const TdParameters &parameters, DbKey key, Events &events) {
auto db = make_unique<TdDb>();
TRY_STATUS(db->init(scheduler_id, parameters, std::move(key), events));
return std::move(db);
}
Result<TdDb::EncryptionInfo> TdDb::check_encryption(const TdParameters &parameters) {
return ::td::check_encryption(get_binlog_path(parameters));
}

View File

@ -48,7 +48,9 @@ class TdDb {
TdDb &operator=(TdDb &&) = delete;
~TdDb();
struct Events {
struct OpenedDatabase {
unique_ptr<TdDb> database;
vector<BinlogEvent> to_secret_chats_manager;
vector<BinlogEvent> user_events;
vector<BinlogEvent> chat_events;
@ -60,7 +62,7 @@ class TdDb {
vector<BinlogEvent> to_notification_manager;
vector<BinlogEvent> to_notification_settings_manager;
};
static Result<unique_ptr<TdDb>> open(int32 scheduler_id, const TdParameters &parameters, DbKey key, Events &events);
static void open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise);
struct EncryptionInfo {
bool is_encrypted{false};
@ -121,7 +123,6 @@ class TdDb {
std::shared_ptr<BinlogKeyValue<ConcurrentBinlog>> config_pmc_;
std::shared_ptr<ConcurrentBinlog> binlog_;
Status init(int32 scheduler_id, const TdParameters &parameters, DbKey key, Events &events);
Status init_sqlite(int32 scheduler_id, const TdParameters &parameters, const DbKey &key, const DbKey &old_key,
BinlogKeyValue<Binlog> &binlog_pmc);

View File

@ -28,7 +28,6 @@
#include "td/utils/algorithm.h"
#include "td/utils/as.h"
#include "td/utils/FloodControlGlobal.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"