Close databases and binlog on another thread.

This commit is contained in:
levlam 2023-10-05 21:50:53 +03:00
parent b41a51f2ec
commit 9289270cd9
6 changed files with 21 additions and 28 deletions

View File

@ -6,7 +6,7 @@ emconfigure true 2> /dev/null || { echo 'emconfigure not found. Install emsdk an
OPENSSL=OpenSSL_1_1_0l
if [ ! -f $OPENSSL.tar.gz ]; then
echo "Downloading OpenSSL sources..."
wget https://github.com/openssl/openssl/archive/$OPENSSL.tar.gz
curl -sfLO https://github.com/openssl/openssl/archive/$OPENSSL.tar.gz
fi
rm -rf ./openssl-$OPENSSL
echo "Unpacking OpenSSL sources..."

View File

@ -38,13 +38,9 @@ void Global::log_out(Slice reason) {
send_closure(auth_manager_, &AuthManager::on_authorization_lost, reason.str());
}
void Global::close_all(Promise<> on_finished) {
td_db_->close_all(std::move(on_finished));
state_manager_.clear();
}
void Global::close_and_destroy_all(Promise<> on_finished) {
td_db_->close_and_destroy_all(std::move(on_finished));
void Global::close_all(bool destroy_flag, Promise<Unit> on_finished) {
td_db_->close(use_sqlite_pmc() ? get_database_scheduler_id() : get_slow_net_scheduler_id(), destroy_flag,
std::move(on_finished));
state_manager_.clear();
}

View File

@ -92,8 +92,7 @@ class Global final : public ActorContext {
void log_out(Slice reason);
void close_all(Promise<> on_finished);
void close_and_destroy_all(Promise<> on_finished);
void close_all(bool destroy_flag, Promise<> on_finished);
Status init(ActorId<Td> td, unique_ptr<TdDb> td_db_ptr) TD_WARN_UNUSED_RESULT;

View File

@ -3340,12 +3340,9 @@ void Td::dec_actor_refcnt() {
option_manager_.reset();
LOG(DEBUG) << "OptionManager was cleared" << timer;
Promise<> promise = PromiseCreator::lambda([actor_id = create_reference()](Unit) mutable { actor_id.reset(); });
if (destroy_flag_) {
G()->close_and_destroy_all(std::move(promise));
} else {
G()->close_all(std::move(promise));
}
G()->close_all(destroy_flag_,
PromiseCreator::lambda([actor_id = create_reference()](Unit) mutable { actor_id.reset(); }));
// NetQueryDispatcher will be closed automatically
close_flag_ = 4;
} else if (close_flag_ == 4) {

View File

@ -270,17 +270,19 @@ void TdDb::flush_all() {
binlog_->force_flush();
}
void TdDb::close_all(Promise<> on_finished) {
LOG(INFO) << "Close all databases";
do_close(std::move(on_finished), false /*destroy_flag*/);
void TdDb::close(int32 scheduler_id, bool destroy_flag, Promise<Unit> on_finished) {
Scheduler::instance()->run_on_scheduler(scheduler_id,
[this, destroy_flag, on_finished = std::move(on_finished)](Unit) mutable {
do_close(destroy_flag, std::move(on_finished));
});
}
void TdDb::close_and_destroy_all(Promise<> on_finished) {
LOG(INFO) << "Destroy all databases";
do_close(std::move(on_finished), true /*destroy_flag*/);
}
void TdDb::do_close(Promise<> on_finished, bool destroy_flag) {
void TdDb::do_close(bool destroy_flag, Promise<Unit> on_finished) {
if (destroy_flag) {
LOG(INFO) << "Destroy all databases";
} else {
LOG(INFO) << "Close all databases";
}
MultiPromiseActorSafe mpas{"TdDbCloseMultiPromiseActor"};
mpas.add_promise(PromiseCreator::lambda(
[promise = std::move(on_finished), sql_connection = std::move(sql_connection_), destroy_flag](Unit) mutable {

View File

@ -134,8 +134,7 @@ class TdDb {
void flush_all();
void close_all(Promise<> on_finished);
void close_and_destroy_all(Promise<> on_finished);
void close(int32 scheduler_id, bool destroy_flag, Promise<Unit> on_finished);
MessageDbSyncInterface *get_message_db_sync();
MessageDbAsyncInterface *get_message_db_async();
@ -190,7 +189,7 @@ class TdDb {
Status init_sqlite(const Parameters &parameters, const DbKey &key, const DbKey &old_key,
BinlogKeyValue<Binlog> &binlog_pmc);
void do_close(Promise<> on_finished, bool destroy_flag);
void do_close(bool destroy_flag, Promise<Unit> on_finished);
};
} // namespace td