tdlight/td/telegram/TdDb.cpp
levlam 98aba6304c Persistent getDialogQuery.
GitOrigin-RevId: e882cd049aac7aa86cf03750a78c3b92635f5379
2018-06-01 19:58:23 +03:00

432 lines
14 KiB
C++

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/TdDb.h"
#include "td/telegram/DialogDb.h"
#include "td/telegram/files/FileDb.h"
#include "td/telegram/logevent/LogEvent.h"
#include "td/telegram/MessagesDb.h"
#include "td/telegram/TdParameters.h"
#include "td/telegram/Version.h"
#include "td/actor/actor.h"
#include "td/actor/MultiPromise.h"
#include "td/db/BinlogKeyValue.h"
#include "td/utils/logging.h"
#include "td/utils/port/path.h"
#include "td/utils/Random.h"
namespace td {
namespace {
std::string get_binlog_path(const TdParameters &parameters) {
return PSTRING() << parameters.database_directory << "td" << (parameters.use_test_dc ? "_test" : "") << ".binlog";
}
std::string get_sqlite_path(const TdParameters &parameters) {
const string db_name = "db" + (parameters.use_test_dc ? string("_test") : string());
return parameters.database_directory + db_name + ".sqlite";
}
Result<EncryptionInfo> check_encryption(string path) {
Binlog binlog;
auto status = binlog.init(path, Binlog::Callback());
if (status.is_error() && status.code() != Binlog::Error::WrongPassword) {
return Status::Error(400, status.message());
}
EncryptionInfo info;
info.is_encrypted = binlog.get_info().wrong_password;
binlog.close(false /*need_sync*/).ensure();
return info;
}
Status init_binlog(Binlog &binlog, string path, BinlogKeyValue<Binlog> &binlog_pmc, BinlogKeyValue<Binlog> &config_pmc,
TdDb::Events &events, DbKey key) {
auto callback = [&](const BinlogEvent &event) {
switch (event.type_) {
case LogEvent::HandlerType::SecretChats:
events.to_secret_chats_manager.push_back(event.clone());
break;
case LogEvent::HandlerType::Users:
events.user_events.push_back(event.clone());
break;
case LogEvent::HandlerType::Chats:
events.chat_events.push_back(event.clone());
break;
case LogEvent::HandlerType::Channels:
events.channel_events.push_back(event.clone());
break;
case LogEvent::HandlerType::SecretChatInfos:
events.secret_chat_events.push_back(event.clone());
break;
case LogEvent::HandlerType::WebPages:
events.web_page_events.push_back(event.clone());
break;
case LogEvent::HandlerType::SendMessage:
case LogEvent::HandlerType::DeleteMessage:
case LogEvent::HandlerType::DeleteMessagesFromServer:
case LogEvent::HandlerType::ReadHistoryOnServer:
case LogEvent::HandlerType::ReadMessageContentsOnServer:
case LogEvent::HandlerType::ForwardMessages:
case LogEvent::HandlerType::SendBotStartMessage:
case LogEvent::HandlerType::SendScreenshotTakenNotificationMessage:
case LogEvent::HandlerType::SendInlineQueryResultMessage:
case LogEvent::HandlerType::DeleteDialogHistoryFromServer:
case LogEvent::HandlerType::ReadAllDialogMentionsOnServer:
case LogEvent::HandlerType::DeleteAllChannelMessagesFromUserOnServer:
case LogEvent::HandlerType::ToggleDialogIsPinnedOnServer:
case LogEvent::HandlerType::ReorderPinnedDialogsOnServer:
case LogEvent::HandlerType::SaveDialogDraftMessageOnServer:
case LogEvent::HandlerType::UpdateDialogNotificationSettingsOnServer:
case LogEvent::HandlerType::UpdateScopeNotificationSettingsOnServer:
case LogEvent::HandlerType::ResetAllNotificationSettingsOnServer:
case LogEvent::HandlerType::ChangeDialogReportSpamStateOnServer:
case LogEvent::HandlerType::GetDialogFromServer:
case LogEvent::HandlerType::GetChannelDifference:
events.to_messages_manager.push_back(event.clone());
break;
case LogEvent::HandlerType::BinlogPmcMagic:
binlog_pmc.external_init_handle(event);
break;
case LogEvent::HandlerType::ConfigPmcMagic:
config_pmc.external_init_handle(event);
break;
default:
LOG(FATAL) << "Unsupported logevent type " << event.type_;
}
};
auto binlog_info = binlog.init(std::move(path), callback, std::move(key));
if (binlog_info.is_error()) {
return binlog_info.move_as_error();
}
return Status::OK();
}
Status init_db(SqliteDb &db) {
TRY_STATUS(db.exec("PRAGMA encoding=\"UTF-8\""));
TRY_STATUS(db.exec("PRAGMA journal_mode=WAL"));
TRY_STATUS(db.exec("PRAGMA synchronous=NORMAL"));
TRY_STATUS(db.exec("PRAGMA temp_store=MEMORY"));
TRY_STATUS(db.exec("PRAGMA secure_delete=1"));
return Status::OK();
}
} // namespace
std::shared_ptr<FileDbInterface> TdDb::get_file_db_shared() {
return file_db_;
}
std::shared_ptr<SqliteConnectionSafe> &TdDb::get_sqlite_connection_safe() {
return sql_connection_;
}
ConcurrentBinlog *TdDb::get_binlog() {
CHECK(binlog_);
return binlog_.get();
}
BinlogPmc TdDb::get_binlog_pmc_shared() {
return binlog_pmc_;
}
BinlogPmcPtr TdDb::get_binlog_pmc() {
CHECK(binlog_pmc_);
return binlog_pmc_.get();
}
BinlogPmcPtr TdDb::get_config_pmc() {
CHECK(config_pmc_);
return config_pmc_.get();
}
BigPmcPtr TdDb::get_sqlite_sync_pmc() {
CHECK(common_kv_safe_);
return &common_kv_safe_->get();
}
SqliteKeyValueAsyncInterface *TdDb::get_sqlite_pmc() {
CHECK(common_kv_async_);
return common_kv_async_.get();
}
MessagesDbSyncInterface *TdDb::get_messages_db_sync() {
return &messages_db_sync_safe_->get();
}
MessagesDbAsyncInterface *TdDb::get_messages_db_async() {
return messages_db_async_.get();
}
DialogDbSyncInterface *TdDb::get_dialog_db_sync() {
return &dialog_db_sync_safe_->get();
}
DialogDbAsyncInterface *TdDb::get_dialog_db_async() {
return dialog_db_async_.get();
}
CSlice TdDb::binlog_path() const {
return binlog_->get_path();
}
CSlice TdDb::sqlite_path() const {
return sqlite_path_;
}
void TdDb::flush_all() {
if (messages_db_async_) {
messages_db_async_->force_flush();
}
binlog_->force_flush();
}
void TdDb::close_all(Promise<> on_finished) {
do_close(std::move(on_finished), false /*destroy_flag*/);
}
void TdDb::close_and_destroy_all(Promise<> on_finished) {
do_close(std::move(on_finished), true /*destroy_flag*/);
}
void TdDb::do_close(Promise<> on_finished, bool destroy_flag) {
MultiPromiseActorSafe mpas;
mpas.add_promise(PromiseCreator::lambda(
[promise = std::move(on_finished), sql_connection = std::move(sql_connection_), destroy_flag](Unit) mutable {
if (sql_connection) {
CHECK(sql_connection.unique()) << sql_connection.use_count();
if (destroy_flag) {
sql_connection->close_and_destroy();
} else {
sql_connection->close();
}
sql_connection.reset();
}
promise.set_value(Unit());
}));
auto lock = mpas.get_promise();
if (file_db_) {
file_db_->close(mpas.get_promise());
file_db_.reset();
}
common_kv_safe_.reset();
if (common_kv_async_) {
common_kv_async_->close(mpas.get_promise());
}
messages_db_sync_safe_.reset();
if (messages_db_async_) {
messages_db_async_->close(mpas.get_promise());
}
dialog_db_sync_safe_.reset();
if (dialog_db_async_) {
dialog_db_async_->close(mpas.get_promise());
}
// binlog_pmc is dependent on binlog_ and anyway it doesn't support close_and_destroy
CHECK(binlog_pmc_.unique());
binlog_pmc_.reset();
CHECK(config_pmc_.unique());
config_pmc_.reset();
if (binlog_) {
if (destroy_flag) {
binlog_->close_and_destroy(mpas.get_promise());
} else {
binlog_->close(mpas.get_promise());
}
binlog_.reset();
}
}
Status TdDb::init_sqlite(int32 scheduler_id, const TdParameters &parameters, DbKey key, DbKey old_key,
BinlogKeyValue<Binlog> &binlog_pmc) {
CHECK(!parameters.use_message_db || parameters.use_chat_info_db);
CHECK(!parameters.use_chat_info_db || parameters.use_file_db);
const string sql_db_name = get_sqlite_path(parameters);
bool use_sqlite = parameters.use_file_db;
bool use_file_db = parameters.use_file_db;
bool use_dialog_db = parameters.use_message_db;
bool use_message_db = parameters.use_message_db;
if (!use_sqlite) {
unlink(sql_db_name).ignore();
return Status::OK();
}
sqlite_path_ = sql_db_name;
TRY_STATUS(SqliteDb::change_key(sqlite_path_, key, old_key));
sql_connection_ = std::make_shared<SqliteConnectionSafe>(sql_db_name, key);
auto &db = sql_connection_->get();
TRY_STATUS(init_db(db));
// Init databases
// Do initialization once and before everything else to avoid "database is locked" error.
// Must be in a transaction
// NB: when database is dropped we should also drop corresponding binlog events
TRY_STATUS(db.exec("BEGIN TRANSACTION"));
// Get 'PRAGMA user_version'
TRY_RESULT(user_version, db.user_version());
LOG(WARNING) << "got PRAGMA user_version = " << user_version;
// init DialogDb
bool dialog_db_was_created = false;
if (use_dialog_db) {
TRY_STATUS(init_dialog_db(db, user_version, dialog_db_was_created));
} else {
TRY_STATUS(drop_dialog_db(db, user_version));
}
// init MessagesDb
if (use_message_db) {
TRY_STATUS(init_messages_db(db, user_version));
} else {
TRY_STATUS(drop_messages_db(db, user_version));
}
// init filesDb
if (use_file_db) {
TRY_STATUS(init_file_db(db, user_version));
} else {
TRY_STATUS(drop_file_db(db, user_version));
}
// Update 'PRAGMA user_version'
auto db_version = current_db_version();
if (db_version != user_version) {
LOG(WARNING) << "set PRAGMA user_version = " << db_version;
TRY_STATUS(db.set_user_version(db_version));
}
if (dialog_db_was_created) {
binlog_pmc.erase("unread_message_count");
binlog_pmc.erase("last_server_dialog_date");
binlog_pmc.erase("promoted_dialog_id");
}
if (db_version == 0) {
binlog_pmc.erase_by_prefix("top_dialogs");
binlog_pmc.erase("next_contacts_sync_date");
}
binlog_pmc.force_sync({});
TRY_STATUS(db.exec("COMMIT TRANSACTION"));
file_db_ = create_file_db(sql_connection_, scheduler_id);
common_kv_safe_ = std::make_shared<SqliteKeyValueSafe>("common", sql_connection_);
common_kv_async_ = create_sqlite_key_value_async(common_kv_safe_, scheduler_id);
if (use_dialog_db) {
dialog_db_sync_safe_ = create_dialog_db_sync(sql_connection_);
dialog_db_async_ = create_dialog_db_async(dialog_db_sync_safe_, scheduler_id);
}
if (use_message_db) {
messages_db_sync_safe_ = create_messages_db_sync(sql_connection_);
messages_db_async_ = create_messages_db_async(messages_db_sync_safe_, scheduler_id);
}
return Status::OK();
}
Status TdDb::init(int32 scheduler_id, const TdParameters &parameters, DbKey key, Events &events) {
// Init pmc
Binlog *binlog_ptr = nullptr;
auto binlog = std::shared_ptr<Binlog>(new Binlog, [&](Binlog *ptr) { binlog_ptr = ptr; });
auto binlog_pmc = std::make_unique<BinlogKeyValue<Binlog>>();
auto config_pmc = std::make_unique<BinlogKeyValue<Binlog>>();
binlog_pmc->external_init_begin(static_cast<int32>(LogEvent::HandlerType::BinlogPmcMagic));
config_pmc->external_init_begin(static_cast<int32>(LogEvent::HandlerType::ConfigPmcMagic));
bool encrypt_binlog = !key.is_empty();
TRY_STATUS(init_binlog(*binlog, get_binlog_path(parameters), *binlog_pmc, *config_pmc, events, std::move(key)));
binlog_pmc->external_init_finish(binlog);
config_pmc->external_init_finish(binlog);
DbKey new_sqlite_key;
DbKey old_sqlite_key;
bool encrypt_sqlite = encrypt_binlog;
bool drop_sqlite_key = false;
auto sqlite_key = binlog_pmc->get("sqlite_key");
if (encrypt_sqlite) {
if (sqlite_key.empty()) {
sqlite_key = string(32, ' ');
Random::secure_bytes(sqlite_key);
binlog_pmc->set("sqlite_key", sqlite_key);
binlog_pmc->force_sync(Auto());
}
new_sqlite_key = DbKey::raw_key(std::move(sqlite_key));
} else {
if (!sqlite_key.empty()) {
old_sqlite_key = DbKey::raw_key(std::move(sqlite_key));
drop_sqlite_key = true;
}
}
auto init_sqlite_status = init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc);
if (init_sqlite_status.is_error()) {
LOG(ERROR) << "Destroy bad sqlite db because of: " << init_sqlite_status;
SqliteDb::destroy(get_sqlite_path(parameters)).ignore();
TRY_STATUS(init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc));
}
if (drop_sqlite_key) {
binlog_pmc->erase("sqlite_key");
binlog_pmc->force_sync(Auto());
}
auto concurrent_binlog_pmc = std::make_shared<BinlogKeyValue<ConcurrentBinlog>>();
concurrent_binlog_pmc->external_init_begin(binlog_pmc->get_magic());
concurrent_binlog_pmc->external_init_handle(std::move(*binlog_pmc));
auto concurrent_config_pmc = std::make_shared<BinlogKeyValue<ConcurrentBinlog>>();
concurrent_config_pmc->external_init_begin(config_pmc->get_magic());
concurrent_config_pmc->external_init_handle(std::move(*config_pmc));
binlog.reset();
binlog_pmc.reset();
config_pmc.reset();
CHECK(binlog_ptr != nullptr);
auto concurrent_binlog = std::make_shared<ConcurrentBinlog>(std::unique_ptr<Binlog>(binlog_ptr), scheduler_id);
concurrent_binlog_pmc->external_init_finish(concurrent_binlog);
concurrent_config_pmc->external_init_finish(concurrent_binlog);
binlog_pmc_ = std::move(concurrent_binlog_pmc);
config_pmc_ = std::move(concurrent_config_pmc);
binlog_ = std::move(concurrent_binlog);
return Status::OK();
}
TdDb::TdDb() = default;
TdDb::~TdDb() = default;
Result<std::unique_ptr<TdDb>> TdDb::open(int32 scheduler_id, const TdParameters &parameters, DbKey key,
Events &events) {
auto db = std::make_unique<TdDb>();
TRY_STATUS(db->init(scheduler_id, parameters, std::move(key), events));
return std::move(db);
}
Result<EncryptionInfo> TdDb::check_encryption(const TdParameters &parameters) {
return ::td::check_encryption(get_binlog_path(parameters));
}
void TdDb::change_key(DbKey key, Promise<> promise) {
get_binlog()->change_key(std::move(key), std::move(promise));
}
Status TdDb::destroy(const TdParameters &parameters) {
SqliteDb::destroy(get_sqlite_path(parameters)).ignore();
Binlog::destroy(get_binlog_path(parameters)).ignore();
return Status::OK();
}
void TdDb::with_db_path(std::function<void(CSlice)> callback) {
SqliteDb::with_db_path(sqlite_path(), callback);
callback(binlog_path());
}
} // namespace td