From 8377726001a209d0d2c844ee4f4449ea383b4321 Mon Sep 17 00:00:00 2001 From: levlam Date: Fri, 2 Feb 2024 15:09:17 +0300 Subject: [PATCH] Add source to Binlog::sync. --- td/telegram/AuthManager.cpp | 2 +- td/telegram/ContactsManager.cpp | 42 ++++++++++++++------------ td/telegram/DeviceTokenManager.cpp | 2 +- td/telegram/SecretChatActor.cpp | 12 ++++---- td/telegram/TdDb.cpp | 6 ++-- tddb/td/db/BinlogKeyValue.h | 10 +++--- tddb/td/db/KeyValueSyncInterface.h | 2 +- tddb/td/db/binlog/Binlog.cpp | 18 ++++++----- tddb/td/db/binlog/Binlog.h | 4 +-- tddb/td/db/binlog/BinlogInterface.h | 2 +- tddb/td/db/binlog/ConcurrentBinlog.cpp | 15 +++++---- tddb/td/db/binlog/ConcurrentBinlog.h | 2 +- test/secret.cpp | 4 +-- 13 files changed, 65 insertions(+), 56 deletions(-) diff --git a/td/telegram/AuthManager.cpp b/td/telegram/AuthManager.cpp index 374381d6a..9a68d15e3 100644 --- a/td/telegram/AuthManager.cpp +++ b/td/telegram/AuthManager.cpp @@ -1172,7 +1172,7 @@ void AuthManager::destroy_auth_keys() { } }); G()->td_db()->get_binlog_pmc()->set("auth", "destroy"); - G()->td_db()->get_binlog_pmc()->force_sync(std::move(promise)); + G()->td_db()->get_binlog_pmc()->force_sync(std::move(promise), "destroy_auth_keys"); } void AuthManager::on_delete_account_result(NetQueryPtr &&net_query) { diff --git a/td/telegram/ContactsManager.cpp b/td/telegram/ContactsManager.cpp index e75fe7ea9..0eaeb2e99 100644 --- a/td/telegram/ContactsManager.cpp +++ b/td/telegram/ContactsManager.cpp @@ -5398,7 +5398,7 @@ void ContactsManager::set_my_id(UserId my_id) { my_id_ = my_id; G()->td_db()->get_binlog_pmc()->set("my_id", to_string(my_id.get())); td_->option_manager_->set_option_integer("my_id", my_id_.get()); - G()->td_db()->get_binlog_pmc()->force_sync(Promise()); + G()->td_db()->get_binlog_pmc()->force_sync(Promise(), "set_my_id"); } } @@ -8835,13 +8835,15 @@ void ContactsManager::on_import_contacts_finished(int64 random_id, vectoruse_chat_info_database()) { - G()->td_db()->get_binlog()->force_sync(PromiseCreator::lambda( - [log_event = log_event_store(all_imported_contacts_).as_slice().str()](Result<> result) mutable { - if (result.is_ok()) { - LOG(INFO) << "Save imported contacts to database"; - G()->td_db()->get_sqlite_pmc()->set("user_imported_contacts", std::move(log_event), Auto()); - } - })); + G()->td_db()->get_binlog()->force_sync( + PromiseCreator::lambda( + [log_event = log_event_store(all_imported_contacts_).as_slice().str()](Result<> result) mutable { + if (result.is_ok()) { + LOG(INFO) << "Save imported contacts to database"; + G()->td_db()->get_sqlite_pmc()->set("user_imported_contacts", std::move(log_event), Auto()); + } + }), + "on_import_contacts_finished"); } for (size_t i = 0; i < result_size; i++) { @@ -8961,17 +8963,19 @@ void ContactsManager::save_contacts_to_database() { transform(contacts_hints_.search_empty(100000).second, [](int64 key) { return UserId(key); }); G()->td_db()->get_binlog_pmc()->set("saved_contact_count", to_string(saved_contact_count_)); - G()->td_db()->get_binlog()->force_sync(PromiseCreator::lambda([user_ids = std::move(user_ids)](Result<> result) { - if (result.is_ok()) { - LOG(INFO) << "Saved contacts to database"; - G()->td_db()->get_sqlite_pmc()->set( - "user_contacts", log_event_store(user_ids).as_slice().str(), PromiseCreator::lambda([](Result<> result) { - if (result.is_ok()) { - send_closure(G()->contacts_manager(), &ContactsManager::save_next_contacts_sync_date); - } - })); - } - })); + G()->td_db()->get_binlog()->force_sync( + PromiseCreator::lambda([user_ids = std::move(user_ids)](Result<> result) { + if (result.is_ok()) { + LOG(INFO) << "Saved contacts to database"; + G()->td_db()->get_sqlite_pmc()->set( + "user_contacts", log_event_store(user_ids).as_slice().str(), PromiseCreator::lambda([](Result<> result) { + if (result.is_ok()) { + send_closure(G()->contacts_manager(), &ContactsManager::save_next_contacts_sync_date); + } + })); + } + }), + "save_contacts_to_database"); } void ContactsManager::on_get_contacts_failed(Status error) { diff --git a/td/telegram/DeviceTokenManager.cpp b/td/telegram/DeviceTokenManager.cpp index dfb0d0a0b..a6024312d 100644 --- a/td/telegram/DeviceTokenManager.cpp +++ b/td/telegram/DeviceTokenManager.cpp @@ -372,7 +372,7 @@ void DeviceTokenManager::save_info(int32 token_type) { } sync_cnt_++; G()->td_db()->get_binlog_pmc()->force_sync( - create_event_promise(self_closure(this, &DeviceTokenManager::dec_sync_cnt))); + create_event_promise(self_closure(this, &DeviceTokenManager::dec_sync_cnt)), "DeviceTokenManager::save_info"); } void DeviceTokenManager::dec_sync_cnt() { diff --git a/td/telegram/SecretChatActor.cpp b/td/telegram/SecretChatActor.cpp index 7474d5f46..421dc9f73 100644 --- a/td/telegram/SecretChatActor.cpp +++ b/td/telegram/SecretChatActor.cpp @@ -677,7 +677,7 @@ void SecretChatActor::cancel_chat(bool delete_history, bool is_already_discarded } }); - context_->binlog()->force_sync(std::move(on_sync)); + context_->binlog()->force_sync(std::move(on_sync), "cancel_chat"); yield(); } @@ -1072,7 +1072,7 @@ void SecretChatActor::do_outbound_message_impl(unique_ptrbinlog(), LogEvent::HandlerType::SecretChats, create_storer(*state->message)); LOG(INFO) << "Outbound secret message [save_log_event] start " << tag("log_event_id", log_event_id); - context_->binlog()->force_sync(std::move(save_log_event_finish)); + context_->binlog()->force_sync(std::move(save_log_event_finish), "do_outbound_message_impl"); state->message->set_log_event_id(log_event_id); } else { LOG(INFO) << "Outbound secret message [save_log_event] skip " << tag("log_event_id", log_event_id); @@ -1329,7 +1329,7 @@ Status SecretChatActor::do_inbound_message_decrypted(unique_ptrbinlog()->force_sync(std::move(save_log_event_finish)); + context_->binlog()->force_sync(std::move(save_log_event_finish), "do_inbound_message_decrypted"); } else { save_log_event_finish.set_value(Unit()); } @@ -1487,7 +1487,7 @@ void SecretChatActor::outbound_resend(uint64 state_id) { "on_outbound_send_message_start"); } }); - context_->binlog()->force_sync(std::move(send_message_start)); + context_->binlog()->force_sync(std::move(send_message_start), "outbound_resend"); } Status SecretChatActor::outbound_rewrite_with_empty(uint64 state_id) { @@ -1668,7 +1668,7 @@ void SecretChatActor::on_outbound_send_message_error(uint64 state_id, Status err } }); if (need_sync) { - context_->binlog()->force_sync(std::move(send_message_start)); + context_->binlog()->force_sync(std::move(send_message_start), "on_outbound_send_message_error"); } else { send_message_start.set_value(Unit()); } @@ -1833,7 +1833,7 @@ Status SecretChatActor::on_update_chat(NetQueryPtr query) { TRY_STATUS(on_update_chat(std::move(config))); if (auth_state_.state == State::WaitRequestResponse) { context_->secret_chat_db()->set_value(auth_state_); - context_->binlog()->force_sync(Promise<>()); + context_->binlog()->force_sync(Promise<>(), "on_update_chat"); } return Status::OK(); } diff --git a/td/telegram/TdDb.cpp b/td/telegram/TdDb.cpp index e8f14c2b4..1a8f19a06 100644 --- a/td/telegram/TdDb.cpp +++ b/td/telegram/TdDb.cpp @@ -449,7 +449,7 @@ Status TdDb::init_sqlite(const Parameters ¶meters, const DbKey &key, const D binlog_pmc.erase("invalidate_old_featured_sticker_sets"); binlog_pmc.erase(AttachMenuManager::get_attach_menu_bots_database_key()); } - binlog_pmc.force_sync({}); + binlog_pmc.force_sync(Auto(), "init_sqlite"); TRY_STATUS(db.exec("COMMIT TRANSACTION")); @@ -535,7 +535,7 @@ void TdDb::open_impl(Parameters parameters, Promise &&promise) { sqlite_key = string(32, ' '); Random::secure_bytes(sqlite_key); binlog_pmc->set("sqlite_key", sqlite_key); - binlog_pmc->force_sync(Auto()); + binlog_pmc->force_sync(Auto(), "TdDb::open_impl 1"); } new_sqlite_key = DbKey::raw_key(std::move(sqlite_key)); } else { @@ -561,7 +561,7 @@ void TdDb::open_impl(Parameters parameters, Promise &&promise) { } if (drop_sqlite_key) { binlog_pmc->erase("sqlite_key"); - binlog_pmc->force_sync(Auto()); + binlog_pmc->force_sync(Auto(), "TdDb::open_impl 2"); } VLOG(td_init) << "Create concurrent_binlog_pmc"; diff --git a/tddb/td/db/BinlogKeyValue.h b/tddb/td/db/BinlogKeyValue.h index 20f802df4..a43c1ba8f 100644 --- a/tddb/td/db/BinlogKeyValue.h +++ b/tddb/td/db/BinlogKeyValue.h @@ -213,8 +213,8 @@ class BinlogKeyValue final : public KeyValueSyncInterface { return it->second.first; } - void force_sync(Promise<> &&promise) final { - binlog_->force_sync(std::move(promise)); + void force_sync(Promise<> &&promise, const char *source) final { + binlog_->force_sync(std::move(promise), source); } void lazy_sync(Promise<> &&promise) { @@ -288,14 +288,14 @@ inline void BinlogKeyValue::add_event(uint64 seq_no, BufferSlice &&event } template <> -inline void BinlogKeyValue::force_sync(Promise<> &&promise) { - binlog_->sync(); +inline void BinlogKeyValue::force_sync(Promise<> &&promise, const char *source) { + binlog_->sync(source); promise.set_value(Unit()); } template <> inline void BinlogKeyValue::lazy_sync(Promise<> &&promise) { - force_sync(std::move(promise)); + force_sync(std::move(promise), "lazy_sync"); } } // namespace td diff --git a/tddb/td/db/KeyValueSyncInterface.h b/tddb/td/db/KeyValueSyncInterface.h index d4e43ffef..2e68e3f7f 100644 --- a/tddb/td/db/KeyValueSyncInterface.h +++ b/tddb/td/db/KeyValueSyncInterface.h @@ -48,7 +48,7 @@ class KeyValueSyncInterface { virtual void erase_by_prefix(Slice prefix) = 0; - virtual void force_sync(Promise<> &&promise) = 0; + virtual void force_sync(Promise<> &&promise, const char *source) = 0; virtual void close(Promise<> promise) = 0; }; diff --git a/tddb/td/db/binlog/Binlog.cpp b/tddb/td/db/binlog/Binlog.cpp index a6221b8d7..76ca502a9 100644 --- a/tddb/td/db/binlog/Binlog.cpp +++ b/tddb/td/db/binlog/Binlog.cpp @@ -286,9 +286,9 @@ Status Binlog::close(bool need_sync) { return Status::OK(); } if (need_sync) { - sync(); + sync("close"); } else { - flush(); + flush("close"); } fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ensure(); @@ -373,7 +373,7 @@ void Binlog::do_event(BinlogEvent &&event) { LOG(INFO) << "Load: init encryption"; } else { CHECK(state_ == State::Reindex); - flush(); + flush("do_event"); update_write_encryption(); //LOG(INFO) << format::cond(state_ == State::Run, "Run", "Reindex") << ": init encryption"; } @@ -404,19 +404,21 @@ void Binlog::do_event(BinlogEvent &&event) { fd_size_ += event_size; } -void Binlog::sync() { - flush(); +void Binlog::sync(const char *source) { + flush(source); if (need_sync_) { + LOG(INFO) << "Sync binlog from " << source; auto status = fd_.sync(); LOG_IF(FATAL, status.is_error()) << "Failed to sync binlog: " << status; need_sync_ = false; } } -void Binlog::flush() { +void Binlog::flush(const char *source) { if (state_ == State::Load) { return; } + LOG(DEBUG) << "Flush binlog from " << source; flush_events_buffer(true); // NB: encryption happens during flush if (byte_flow_flag_) { @@ -448,7 +450,7 @@ void Binlog::lazy_flush() { buffer_reader_.sync_with_writer(); auto size = buffer_reader_.size() + events_buffer_size; if (size > (1 << 14)) { - flush(); + flush("lazy_flush"); } else if (size > 0 && need_flush_since_ == 0) { need_flush_since_ = Time::now_cached(); } @@ -660,7 +662,7 @@ void Binlog::do_reindex() { do_event(std::move(event)); // NB: no move is actually happens }); { - flush(); + flush("do_reindex"); if (start_size != 0) { // must sync creation of the file if it is non-empty auto status = fd_.sync_barrier(); LOG_IF(FATAL, status.is_error()) << "Failed to sync binlog: " << status; diff --git a/tddb/td/db/binlog/Binlog.h b/tddb/td/db/binlog/Binlog.h index 14cb25517..82b8276a2 100644 --- a/tddb/td/db/binlog/Binlog.h +++ b/tddb/td/db/binlog/Binlog.h @@ -109,8 +109,8 @@ class Binlog { } void add_event(BinlogEvent &&event); - void sync(); - void flush(); + void sync(const char *source); + void flush(const char *source); void lazy_flush(); double need_flush_since() const { return need_flush_since_; diff --git a/tddb/td/db/binlog/BinlogInterface.h b/tddb/td/db/binlog/BinlogInterface.h index 37c6cdc88..c6d6b9664 100644 --- a/tddb/td/db/binlog/BinlogInterface.h +++ b/tddb/td/db/binlog/BinlogInterface.h @@ -74,7 +74,7 @@ class BinlogInterface { return seq_no; } - virtual void force_sync(Promise<> promise) = 0; + virtual void force_sync(Promise<> promise, const char *source) = 0; virtual void force_flush() = 0; virtual void change_key(DbKey db_key, Promise<> promise) = 0; diff --git a/tddb/td/db/binlog/ConcurrentBinlog.cpp b/tddb/td/db/binlog/ConcurrentBinlog.cpp index 7ae0df269..329e08d30 100644 --- a/tddb/td/db/binlog/ConcurrentBinlog.cpp +++ b/tddb/td/db/binlog/ConcurrentBinlog.cpp @@ -61,7 +61,8 @@ class BinlogActor final : public Actor { try_flush(); } - void force_sync(Promise<> &&promise) { + void force_sync(Promise<> &&promise, const char *source) { + LOG(INFO) << "Force binlog sync from " << source; auto seq_no = processor_.max_unfinished_seq_no(); if (processor_.max_finished_seq_no() == seq_no) { do_immediate_sync(std::move(promise)); @@ -72,7 +73,7 @@ class BinlogActor final : public Actor { void force_flush() { // TODO: use same logic as in force_sync - binlog_->flush(); + binlog_->flush("force_flush"); flush_flag_ = false; } @@ -115,7 +116,7 @@ class BinlogActor final : public Actor { auto need_flush_since = binlog_->need_flush_since(); auto now = Time::now_cached(); if (now > need_flush_since + FLUSH_TIMEOUT - 1e-9) { - binlog_->flush(); + binlog_->flush("try_flush"); } else { if (!force_sync_flag_) { flush_flag_ = true; @@ -161,7 +162,7 @@ class BinlogActor final : public Actor { flush_flag_ = false; wakeup_at_ = 0; if (need_sync) { - binlog_->sync(); + binlog_->sync("timeout_expired"); // LOG(ERROR) << "BINLOG SYNC"; set_promises(sync_promises_); } else if (need_flush) { @@ -205,12 +206,14 @@ void ConcurrentBinlog::add_raw_event_impl(uint64 event_id, BufferSlice &&raw_eve send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, event_id, std::move(raw_event), std::move(promise), info); } -void ConcurrentBinlog::force_sync(Promise<> promise) { - send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise)); +void ConcurrentBinlog::force_sync(Promise<> promise, const char *source) { + send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise), source); } + void ConcurrentBinlog::force_flush() { send_closure(binlog_actor_, &detail::BinlogActor::force_flush); } + void ConcurrentBinlog::change_key(DbKey db_key, Promise<> promise) { send_closure(binlog_actor_, &detail::BinlogActor::change_key, std::move(db_key), std::move(promise)); } diff --git a/tddb/td/db/binlog/ConcurrentBinlog.h b/tddb/td/db/binlog/ConcurrentBinlog.h index 464ddb22d..321e2ebd5 100644 --- a/tddb/td/db/binlog/ConcurrentBinlog.h +++ b/tddb/td/db/binlog/ConcurrentBinlog.h @@ -41,7 +41,7 @@ class ConcurrentBinlog final : public BinlogInterface { ConcurrentBinlog &operator=(ConcurrentBinlog &&) = delete; ~ConcurrentBinlog() final; - void force_sync(Promise<> promise) final; + void force_sync(Promise<> promise, const char *source) final; void force_flush() final; void change_key(DbKey db_key, Promise<> promise) final; diff --git a/test/secret.cpp b/test/secret.cpp index 8d3b2fae2..be3274d0a 100644 --- a/test/secret.cpp +++ b/test/secret.cpp @@ -364,7 +364,7 @@ class FakeBinlog final FakeBinlog() { register_actor("FakeBinlog", this).release(); } - void force_sync(Promise<> promise) final { + void force_sync(Promise<> promise, const char *source) final { if (pending_events_.empty()) { pending_events_.emplace_back(); } @@ -639,7 +639,7 @@ class Master final : public Actor { if (binlog_generation != binlog_generation_) { return promise.set_error(Status::Error("Binlog generation mismatch")); } - binlog_->force_sync(std::move(promise)); + binlog_->force_sync(std::move(promise), "sync_binlog"); } void on_closed() { LOG(INFO) << "CLOSED";