Merge remote-tracking branch 'td/master'
This commit is contained in:
commit
f4fd06684c
@ -104,6 +104,6 @@ std::shared_ptr<DialogDbSyncSafeInterface> create_dialog_db_sync(
|
|||||||
std::shared_ptr<SqliteConnectionSafe> sqlite_connection);
|
std::shared_ptr<SqliteConnectionSafe> sqlite_connection);
|
||||||
|
|
||||||
std::shared_ptr<DialogDbAsyncInterface> create_dialog_db_async(std::shared_ptr<DialogDbSyncSafeInterface> sync_db,
|
std::shared_ptr<DialogDbAsyncInterface> create_dialog_db_async(std::shared_ptr<DialogDbSyncSafeInterface> sync_db,
|
||||||
int32 scheduler_id);
|
int32 scheduler_id = -1);
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -205,6 +205,6 @@ std::shared_ptr<MessagesDbSyncSafeInterface> create_messages_db_sync(
|
|||||||
std::shared_ptr<SqliteConnectionSafe> sqlite_connection);
|
std::shared_ptr<SqliteConnectionSafe> sqlite_connection);
|
||||||
|
|
||||||
std::shared_ptr<MessagesDbAsyncInterface> create_messages_db_async(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,
|
std::shared_ptr<MessagesDbAsyncInterface> create_messages_db_async(std::shared_ptr<MessagesDbSyncSafeInterface> sync_db,
|
||||||
int32 scheduler_id);
|
int32 scheduler_id = -1);
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -273,7 +273,7 @@ void TdDb::do_close(Promise<> on_finished, bool destroy_flag) {
|
|||||||
lock.set_value(Unit());
|
lock.set_value(Unit());
|
||||||
}
|
}
|
||||||
|
|
||||||
Status TdDb::init_sqlite(int32 scheduler_id, const TdParameters ¶meters, const DbKey &key, const DbKey &old_key,
|
Status TdDb::init_sqlite(const TdParameters ¶meters, const DbKey &key, const DbKey &old_key,
|
||||||
BinlogKeyValue<Binlog> &binlog_pmc) {
|
BinlogKeyValue<Binlog> &binlog_pmc) {
|
||||||
CHECK(!parameters.use_message_db || parameters.use_chat_info_db);
|
CHECK(!parameters.use_message_db || parameters.use_chat_info_db);
|
||||||
CHECK(!parameters.use_chat_info_db || parameters.use_file_db);
|
CHECK(!parameters.use_chat_info_db || parameters.use_file_db);
|
||||||
@ -366,37 +366,31 @@ Status TdDb::init_sqlite(int32 scheduler_id, const TdParameters ¶meters, con
|
|||||||
|
|
||||||
TRY_STATUS(db.exec("COMMIT TRANSACTION"));
|
TRY_STATUS(db.exec("COMMIT TRANSACTION"));
|
||||||
|
|
||||||
file_db_ = create_file_db(sql_connection_, scheduler_id);
|
file_db_ = create_file_db(sql_connection_);
|
||||||
|
|
||||||
common_kv_safe_ = std::make_shared<SqliteKeyValueSafe>("common", sql_connection_);
|
common_kv_safe_ = std::make_shared<SqliteKeyValueSafe>("common", sql_connection_);
|
||||||
common_kv_async_ = create_sqlite_key_value_async(common_kv_safe_, scheduler_id);
|
common_kv_async_ = create_sqlite_key_value_async(common_kv_safe_);
|
||||||
|
|
||||||
if (use_dialog_db) {
|
if (use_dialog_db) {
|
||||||
dialog_db_sync_safe_ = create_dialog_db_sync(sql_connection_);
|
dialog_db_sync_safe_ = create_dialog_db_sync(sql_connection_);
|
||||||
dialog_db_async_ = create_dialog_db_async(dialog_db_sync_safe_, scheduler_id);
|
dialog_db_async_ = create_dialog_db_async(dialog_db_sync_safe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (use_downloads_db) {
|
if (use_downloads_db) {
|
||||||
messages_db_sync_safe_ = create_messages_db_sync(sql_connection_);
|
messages_db_sync_safe_ = create_messages_db_sync(sql_connection_);
|
||||||
messages_db_async_ = create_messages_db_async(messages_db_sync_safe_, scheduler_id);
|
messages_db_async_ = create_messages_db_async(messages_db_sync_safe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
void TdDb::open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise) {
|
void TdDb::open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise) {
|
||||||
if (scheduler_id >= 0 && Scheduler::instance()->sched_id() != scheduler_id) {
|
Scheduler::instance()->run_on_scheduler(
|
||||||
class Worker final : public Actor {
|
scheduler_id, [parameters = std::move(parameters), key = std::move(key), promise = std::move(promise)](
|
||||||
public:
|
Unit) mutable { TdDb::open_impl(std::move(parameters), std::move(key), std::move(promise)); });
|
||||||
void open(TdParameters &¶meters, DbKey &&key, Promise<OpenedDatabase> &&promise) {
|
}
|
||||||
TdDb::open(-1, std::move(parameters), std::move(key), std::move(promise));
|
|
||||||
stop();
|
void TdDb::open_impl(TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise) {
|
||||||
}
|
|
||||||
};
|
|
||||||
send_closure(create_actor_on_scheduler<Worker>("Worker", scheduler_id), &Worker::open, std::move(parameters),
|
|
||||||
std::move(key), std::move(promise));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
OpenedDatabase result;
|
OpenedDatabase result;
|
||||||
|
|
||||||
// Init pmc
|
// Init pmc
|
||||||
@ -440,7 +434,7 @@ void TdDb::open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<
|
|||||||
}
|
}
|
||||||
VLOG(td_init) << "Start to init database";
|
VLOG(td_init) << "Start to init database";
|
||||||
auto db = make_unique<TdDb>();
|
auto db = make_unique<TdDb>();
|
||||||
auto init_sqlite_status = db->init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc);
|
auto init_sqlite_status = db->init_sqlite(parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc);
|
||||||
VLOG(td_init) << "Finish to init database";
|
VLOG(td_init) << "Finish to init database";
|
||||||
if (init_sqlite_status.is_error()) {
|
if (init_sqlite_status.is_error()) {
|
||||||
LOG(ERROR) << "Destroy bad SQLite database because of " << init_sqlite_status;
|
LOG(ERROR) << "Destroy bad SQLite database because of " << init_sqlite_status;
|
||||||
@ -448,7 +442,7 @@ void TdDb::open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<
|
|||||||
db->sql_connection_->get().close();
|
db->sql_connection_->get().close();
|
||||||
}
|
}
|
||||||
SqliteDb::destroy(get_sqlite_path(parameters)).ignore();
|
SqliteDb::destroy(get_sqlite_path(parameters)).ignore();
|
||||||
TRY_STATUS_PROMISE(promise, db->init_sqlite(scheduler_id, parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc));
|
TRY_STATUS_PROMISE(promise, db->init_sqlite(parameters, new_sqlite_key, old_sqlite_key, *binlog_pmc));
|
||||||
}
|
}
|
||||||
if (drop_sqlite_key) {
|
if (drop_sqlite_key) {
|
||||||
binlog_pmc->erase("sqlite_key");
|
binlog_pmc->erase("sqlite_key");
|
||||||
@ -471,7 +465,7 @@ void TdDb::open(int32 scheduler_id, TdParameters parameters, DbKey key, Promise<
|
|||||||
|
|
||||||
CHECK(binlog_ptr != nullptr);
|
CHECK(binlog_ptr != nullptr);
|
||||||
VLOG(td_init) << "Create concurrent_binlog";
|
VLOG(td_init) << "Create concurrent_binlog";
|
||||||
auto concurrent_binlog = std::make_shared<ConcurrentBinlog>(unique_ptr<Binlog>(binlog_ptr), scheduler_id);
|
auto concurrent_binlog = std::make_shared<ConcurrentBinlog>(unique_ptr<Binlog>(binlog_ptr));
|
||||||
|
|
||||||
VLOG(td_init) << "Init concurrent_binlog_pmc";
|
VLOG(td_init) << "Init concurrent_binlog_pmc";
|
||||||
concurrent_binlog_pmc->external_init_finish(concurrent_binlog);
|
concurrent_binlog_pmc->external_init_finish(concurrent_binlog);
|
||||||
@ -491,18 +485,13 @@ TdDb::TdDb() = default;
|
|||||||
TdDb::~TdDb() = default;
|
TdDb::~TdDb() = default;
|
||||||
|
|
||||||
void TdDb::check_parameters(int32 scheduler_id, TdParameters parameters, Promise<CheckedParameters> promise) {
|
void TdDb::check_parameters(int32 scheduler_id, TdParameters parameters, Promise<CheckedParameters> promise) {
|
||||||
if (scheduler_id >= 0 && Scheduler::instance()->sched_id() != scheduler_id) {
|
Scheduler::instance()->run_on_scheduler(
|
||||||
class Worker final : public Actor {
|
scheduler_id, [parameters = std::move(parameters), promise = std::move(promise)](Unit) mutable {
|
||||||
public:
|
TdDb::check_parameters_impl(std::move(parameters), std::move(promise));
|
||||||
void run(TdParameters parameters, Promise<CheckedParameters> promise) {
|
});
|
||||||
TdDb::check_parameters(-1, std::move(parameters), std::move(promise));
|
}
|
||||||
stop();
|
|
||||||
}
|
void TdDb::check_parameters_impl(TdParameters parameters, Promise<CheckedParameters> promise) {
|
||||||
};
|
|
||||||
send_closure(create_actor_on_scheduler<Worker>("Worker", scheduler_id), &Worker::run, std::move(parameters),
|
|
||||||
std::move(promise));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
CheckedParameters result;
|
CheckedParameters result;
|
||||||
|
|
||||||
auto prepare_dir = [](string dir) -> Result<string> {
|
auto prepare_dir = [](string dir) -> Result<string> {
|
||||||
|
@ -124,7 +124,11 @@ class TdDb {
|
|||||||
std::shared_ptr<BinlogKeyValue<ConcurrentBinlog>> config_pmc_;
|
std::shared_ptr<BinlogKeyValue<ConcurrentBinlog>> config_pmc_;
|
||||||
std::shared_ptr<ConcurrentBinlog> binlog_;
|
std::shared_ptr<ConcurrentBinlog> binlog_;
|
||||||
|
|
||||||
Status init_sqlite(int32 scheduler_id, const TdParameters ¶meters, const DbKey &key, const DbKey &old_key,
|
static void open_impl(TdParameters parameters, DbKey key, Promise<OpenedDatabase> &&promise);
|
||||||
|
|
||||||
|
static void check_parameters_impl(TdParameters parameters, Promise<CheckedParameters> promise);
|
||||||
|
|
||||||
|
Status init_sqlite(const TdParameters ¶meters, const DbKey &key, const DbKey &old_key,
|
||||||
BinlogKeyValue<Binlog> &binlog_pmc);
|
BinlogKeyValue<Binlog> &binlog_pmc);
|
||||||
|
|
||||||
void do_close(Promise<> on_finished, bool destroy_flag);
|
void do_close(Promise<> on_finished, bool destroy_flag);
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
#include "td/utils/port/Poll.h"
|
#include "td/utils/port/Poll.h"
|
||||||
#include "td/utils/port/PollFlags.h"
|
#include "td/utils/port/PollFlags.h"
|
||||||
#include "td/utils/port/thread_local.h"
|
#include "td/utils/port/thread_local.h"
|
||||||
|
#include "td/utils/Promise.h"
|
||||||
#include "td/utils/Slice.h"
|
#include "td/utils/Slice.h"
|
||||||
#include "td/utils/Time.h"
|
#include "td/utils/Time.h"
|
||||||
#include "td/utils/type_traits.h"
|
#include "td/utils/type_traits.h"
|
||||||
@ -98,6 +99,8 @@ class Scheduler {
|
|||||||
void send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
|
void send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
|
||||||
void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
|
void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
|
||||||
|
|
||||||
|
void run_on_scheduler(int32 sched_id, Promise<Unit> action); // TODO Action
|
||||||
|
|
||||||
template <ActorSendType send_type, class EventT>
|
template <ActorSendType send_type, class EventT>
|
||||||
void send_lambda(ActorRef actor_ref, EventT &&lambda);
|
void send_lambda(ActorRef actor_ref, EventT &&lambda);
|
||||||
|
|
||||||
|
@ -339,6 +339,28 @@ void Scheduler::send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_i
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Scheduler::run_on_scheduler(int32 sched_id, Promise<Unit> action) {
|
||||||
|
if (sched_id >= 0 && sched_id_ != sched_id) {
|
||||||
|
class Worker final : public Actor {
|
||||||
|
public:
|
||||||
|
explicit Worker(Promise<Unit> action) : action_(std::move(action)) {
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Promise<Unit> action_;
|
||||||
|
|
||||||
|
void start_up() final {
|
||||||
|
action_.set_value(Unit());
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
create_actor_on_scheduler<Worker>("RunOnSchedulerWorker", sched_id, std::move(action)).release();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
action.set_value(Unit());
|
||||||
|
}
|
||||||
|
|
||||||
void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) {
|
void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) {
|
||||||
if (!actor_info->is_running()) {
|
if (!actor_info->is_running()) {
|
||||||
auto node = actor_info->get_list_node();
|
auto node = actor_info->get_list_node();
|
||||||
|
@ -76,7 +76,9 @@ Status SqliteDb::init(CSlice path, bool allow_creation) {
|
|||||||
auto database_stat = stat(path);
|
auto database_stat = stat(path);
|
||||||
if (database_stat.is_error()) {
|
if (database_stat.is_error()) {
|
||||||
if (!allow_creation) {
|
if (!allow_creation) {
|
||||||
LOG(FATAL) << "Database was deleted during execution and can't be recreated: " << database_stat.error();
|
bool was_destroyed = detail::RawSqliteDb::was_any_database_destroyed();
|
||||||
|
auto reason = was_destroyed ? Slice("was corrupted and deleted") : Slice("disappeared");
|
||||||
|
LOG(FATAL) << "Database " << reason << " during execution and can't be recreated: " << database_stat.error();
|
||||||
}
|
}
|
||||||
TRY_STATUS(destroy(path));
|
TRY_STATUS(destroy(path));
|
||||||
}
|
}
|
||||||
|
@ -63,8 +63,6 @@ class SqliteDb {
|
|||||||
static Result<SqliteDb> change_key(CSlice path, bool allow_creation, const DbKey &new_db_key,
|
static Result<SqliteDb> change_key(CSlice path, bool allow_creation, const DbKey &new_db_key,
|
||||||
const DbKey &old_db_key);
|
const DbKey &old_db_key);
|
||||||
|
|
||||||
Status last_error();
|
|
||||||
|
|
||||||
sqlite3 *get_native() const {
|
sqlite3 *get_native() const {
|
||||||
return raw_->db();
|
return raw_->db();
|
||||||
}
|
}
|
||||||
|
@ -14,9 +14,13 @@
|
|||||||
#include "td/utils/port/path.h"
|
#include "td/utils/port/path.h"
|
||||||
#include "td/utils/port/Stat.h"
|
#include "td/utils/port/Stat.h"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
|
static std::atomic<bool> was_database_destroyed{false};
|
||||||
|
|
||||||
Status RawSqliteDb::last_error(sqlite3 *db, CSlice path) {
|
Status RawSqliteDb::last_error(sqlite3 *db, CSlice path) {
|
||||||
return Status::Error(PSLICE() << Slice(sqlite3_errmsg(db)) << " for database \"" << path << '"');
|
return Status::Error(PSLICE() << Slice(sqlite3_errmsg(db)) << " for database \"" << path << '"');
|
||||||
}
|
}
|
||||||
@ -36,12 +40,17 @@ Status RawSqliteDb::last_error() {
|
|||||||
//If database was corrupted, try to delete it.
|
//If database was corrupted, try to delete it.
|
||||||
auto code = sqlite3_errcode(db_);
|
auto code = sqlite3_errcode(db_);
|
||||||
if (code == SQLITE_CORRUPT) {
|
if (code == SQLITE_CORRUPT) {
|
||||||
|
was_database_destroyed.store(true, std::memory_order_relaxed);
|
||||||
destroy(path_).ignore();
|
destroy(path_).ignore();
|
||||||
}
|
}
|
||||||
|
|
||||||
return last_error(db_, path());
|
return last_error(db_, path());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool RawSqliteDb::was_any_database_destroyed() {
|
||||||
|
return was_database_destroyed.load(std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
RawSqliteDb::~RawSqliteDb() {
|
RawSqliteDb::~RawSqliteDb() {
|
||||||
auto rc = sqlite3_close(db_);
|
auto rc = sqlite3_close(db_);
|
||||||
LOG_IF(FATAL, rc != SQLITE_OK) << last_error(db_, path());
|
LOG_IF(FATAL, rc != SQLITE_OK) << last_error(db_, path());
|
||||||
|
@ -45,6 +45,8 @@ class RawSqliteDb {
|
|||||||
Status last_error();
|
Status last_error();
|
||||||
static Status last_error(sqlite3 *db, CSlice path);
|
static Status last_error(sqlite3 *db, CSlice path);
|
||||||
|
|
||||||
|
static bool was_any_database_destroyed();
|
||||||
|
|
||||||
bool on_begin() {
|
bool on_begin() {
|
||||||
begin_cnt_++;
|
begin_cnt_++;
|
||||||
return begin_cnt_ == 1;
|
return begin_cnt_ == 1;
|
||||||
|
@ -7,17 +7,17 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
//#include "td/utils/FlatHashMapChunks.h"
|
//#include "td/utils/FlatHashMapChunks.h"
|
||||||
//#include "td/utils/FlatHashTable.h"
|
#include "td/utils/FlatHashTable.h"
|
||||||
//#include "td/utils/MapNode.h"
|
#include "td/utils/MapNode.h"
|
||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <unordered_map>
|
//#include <unordered_map>
|
||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
template <class KeyT, class ValueT, class HashT = std::hash<KeyT>, class EqT = std::equal_to<KeyT>>
|
template <class KeyT, class ValueT, class HashT = std::hash<KeyT>, class EqT = std::equal_to<KeyT>>
|
||||||
//using FlatHashMap = FlatHashTable<MapNode<KeyT, ValueT>, HashT, EqT>;
|
using FlatHashMap = FlatHashTable<MapNode<KeyT, ValueT>, HashT, EqT>;
|
||||||
//using FlatHashMap = FlatHashMapChunks<KeyT, ValueT, HashT, EqT>;
|
//using FlatHashMap = FlatHashMapChunks<KeyT, ValueT, HashT, EqT>;
|
||||||
using FlatHashMap = std::unordered_map<KeyT, ValueT, HashT, EqT>;
|
//using FlatHashMap = std::unordered_map<KeyT, ValueT, HashT, EqT>;
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -7,17 +7,17 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
//#include "td/utils/FlatHashMapChunks.h"
|
//#include "td/utils/FlatHashMapChunks.h"
|
||||||
//#include "td/utils/FlatHashTable.h"
|
#include "td/utils/FlatHashTable.h"
|
||||||
//#include "td/utils/SetNode.h"
|
#include "td/utils/SetNode.h"
|
||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <unordered_set>
|
//#include <unordered_set>
|
||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
template <class KeyT, class HashT = std::hash<KeyT>, class EqT = std::equal_to<KeyT>>
|
template <class KeyT, class HashT = std::hash<KeyT>, class EqT = std::equal_to<KeyT>>
|
||||||
//using FlatHashSet = FlatHashTable<SetNode<KeyT>, HashT, EqT>;
|
using FlatHashSet = FlatHashTable<SetNode<KeyT>, HashT, EqT>;
|
||||||
//using FlatHashSet = FlatHashSetChunks<KeyT, HashT, EqT>;
|
//using FlatHashSet = FlatHashSetChunks<KeyT, HashT, EqT>;
|
||||||
using FlatHashSet = std::unordered_set<KeyT, HashT, EqT>;
|
//using FlatHashSet = std::unordered_set<KeyT, HashT, EqT>;
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
Loading…
Reference in New Issue
Block a user