From c44cd3415c601f2fa248f3e58da35d213aa96051 Mon Sep 17 00:00:00 2001 From: levlam Date: Sun, 2 Aug 2020 22:07:22 +0300 Subject: [PATCH] Move QtsManager to UpdatesManager. GitOrigin-RevId: 792faddd71cb3f9c07a4fd915ca782bfe2606ac3 --- td/telegram/SecretChatActor.cpp | 16 ++-- td/telegram/SecretChatsManager.cpp | 107 ++----------------------- td/telegram/SecretChatsManager.h | 15 +--- td/telegram/Td.cpp | 8 -- td/telegram/Td.h | 2 - td/telegram/UpdatesManager.cpp | 73 ++++++++++++----- td/telegram/UpdatesManager.h | 12 ++- td/telegram/logevent/SecretChatEvent.h | 6 +- test/secret.cpp | 2 +- 9 files changed, 81 insertions(+), 160 deletions(-) diff --git a/td/telegram/SecretChatActor.cpp b/td/telegram/SecretChatActor.cpp index a274c8fe8..55f07ef8f 100644 --- a/td/telegram/SecretChatActor.cpp +++ b/td/telegram/SecretChatActor.cpp @@ -153,7 +153,7 @@ void SecretChatActor::replay_create_chat(unique_ptr void SecretChatActor::add_inbound_message(unique_ptr message) { SCOPE_EXIT { if (message) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); } }; if (close_flag_) { @@ -877,7 +877,7 @@ Result> SecretChatActor::decrypt(BufferSl Status SecretChatActor::do_inbound_message_encrypted(unique_ptr message) { SCOPE_EXIT { if (message) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); } }; TRY_RESULT(decrypted, decrypt(message->encrypted_message)); @@ -969,13 +969,13 @@ Status SecretChatActor::check_seq_no(int in_seq_no, int out_seq_no, int32 his_la Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr message) { SCOPE_EXIT { - CHECK(message == nullptr || !message->qts_ack); + CHECK(message == nullptr || !message->promise); }; auto in_seq_no = message->decrypted_message_layer->in_seq_no_; auto out_seq_no = message->decrypted_message_layer->out_seq_no_; auto status = check_seq_no(in_seq_no, out_seq_no, message->his_layer()); if (status.is_error() && status.code() != 2 /* not gap found */) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); if (message->logevent_id()) { LOG(INFO) << "Erase binlog event: " << tag("logevent_id", message->logevent_id()); binlog_erase(context_->binlog(), message->logevent_id()); @@ -1010,14 +1010,14 @@ Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr(action_resend->start_seq_no_ / 2); uint32 finish_seq_no = static_cast(action_resend->end_seq_no_ / 2); if (start_seq_no + MAX_RESEND_COUNT < finish_seq_no) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); return Status::Error(PSLICE() << "Won't resend more than " << MAX_RESEND_COUNT << " messages"); } LOG(INFO) << "ActionResend: " << tag("start", start_seq_no) << tag("finish_seq_no", finish_seq_no); for (auto seq_no = start_seq_no; seq_no <= finish_seq_no; seq_no++) { auto it = out_seq_no_to_outbound_message_state_token_.find(seq_no); if (it == out_seq_no_to_outbound_message_state_token_.end()) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); return Status::Error(PSLICE() << "Can't resend query " << tag("seq_no", seq_no)); } auto state_id = it->second; @@ -1198,7 +1198,7 @@ void SecretChatActor::do_inbound_message_decrypted_pending(unique_ptrlogevent_id(); // qts - auto qts_promise = std::move(message->qts_ack); + auto qts_promise = std::move(message->promise); if (logevent_id == 0) { message->is_pending = true; @@ -1276,7 +1276,7 @@ Status SecretChatActor::do_inbound_message_decrypted(unique_ptrqts_ack); + auto qts_promise = std::move(message->promise); // process message tl_object_ptr file; diff --git a/td/telegram/SecretChatsManager.cpp b/td/telegram/SecretChatsManager.cpp index 41695c6a2..5925b5335 100644 --- a/td/telegram/SecretChatsManager.cpp +++ b/td/telegram/SecretChatsManager.cpp @@ -43,18 +43,6 @@ namespace td { -// qts and seq_no -// Each EncryptedMessage (update_message) has qts. -// Such updates must be handled in order of qts -// -// Qts should be handled on level of SecretChatsManager -// 1. Each update can be received by SecretChatsManager multiple times. -// 2. Each update should be sent to SecretChatActor only once. (Though SecretChatActor mustn't rely it) -// 3. Updates must be send in order of qts, without gaps. -// 4. SecretChatActor must notify SecretChatManager when update is processed (saved in database) -// 5. Only after all updates <= qts are processed by SecretChatActor, UpdatesManager should be -// notified about new qts. -// // seq_no // 1. // x_in = 0 if we initiated secret chat. @@ -94,12 +82,6 @@ void SecretChatsManager::start_up() { dummy_mode_ = true; return; } - // TODO: use database wrapper - auto pmc = G()->td_db()->get_binlog_pmc(); - auto qts_str = pmc->get("updates.qts"); - if (!qts_str.empty()) { - init_qts(to_integer(qts_str)); - } class StateCallback : public StateManager::Callback { public: @@ -116,25 +98,6 @@ void SecretChatsManager::start_up() { send_closure(G()->state_manager(), &StateManager::add_callback, make_unique(actor_id(this))); } -void SecretChatsManager::init_qts(int qts) { - if (dummy_mode_ || close_flag_) { - return; - } - has_qts_ = true; - qts_manager_.init(qts); - LOG(INFO) << "Init secret chats qts " << tag("qts", qts); -} - -void SecretChatsManager::update_qts(int qts) { - if (dummy_mode_ || close_flag_ || qts < 0) { - return; - } - LOG(INFO) << "Update qts to " << qts; - add_qts(qts).set_value(Unit()); - has_qts_ = true; - LOG(INFO) << "Update secret chats qts " << tag("qts", qts); -} - void SecretChatsManager::create_chat(int32 user_id, int64 user_access_hash, Promise promise) { int32 random_id; ActorId actor; @@ -202,14 +165,6 @@ void SecretChatsManager::send_set_ttl_message(SecretChatId secret_chat_id, int32 send_closure(actor, &SecretChatActor::send_set_ttl_message, ttl, random_id, std::move(safe_promise)); } -void SecretChatsManager::before_get_difference(int32 qts) { - if (dummy_mode_ || close_flag_) { - return; - } - last_get_difference_qts_ = qts; - // We will receive all updates later than qts anyway. -} - void SecretChatsManager::on_update_chat(tl_object_ptr update) { if (dummy_mode_ || close_flag_) { return; @@ -228,47 +183,22 @@ void SecretChatsManager::do_update_chat(tl_object_ptrchat_)); } -void SecretChatsManager::on_update_message(tl_object_ptr update, - bool force_apply) { +void SecretChatsManager::on_new_message(tl_object_ptr &&message_ptr, + Promise &&promise) { if (dummy_mode_ || close_flag_) { return; } - // UpdatesManager MUST postpone updates during GetDifference - auto qts = update->qts_; - if (!force_apply) { - if (!has_qts_) { - LOG(INFO) << "Got update, don't know current qts. Force get_difference"; - force_get_difference(); - return; - } - if (qts <= last_get_difference_qts_) { - LOG(WARNING) << "Got updates with " << tag("qts", qts) << " lower or equal than " - << tag("last get difference qts", last_get_difference_qts_); - force_get_difference(); - return; - } - auto mem_qts = qts_manager_.mem_pts(); - if (qts <= mem_qts) { - LOG(WARNING) << "Duplicated update " << tag("qts", qts) << tag("mem_qts", mem_qts); - return; - } - if (qts != mem_qts + 1) { - LOG(WARNING) << "Got gap in qts " << mem_qts << " ... " << qts; - force_get_difference(); - // TODO wait 1 second? - return; - } - } + CHECK(message_ptr != nullptr); auto event = make_unique(); - event->qts_ack = add_qts(qts); - downcast_call(*update->message_, [&](auto &x) { + event->promise = std::move(promise); + downcast_call(*message_ptr, [&](auto &x) { event->chat_id = x.chat_id_; event->date = x.date_; event->encrypted_message = std::move(x.bytes_); }); - if (update->message_->get_id() == telegram_api::encryptedMessage::ID) { - auto message = move_tl_object_as(update->message_); + if (message_ptr->get_id() == telegram_api::encryptedMessage::ID) { + auto message = move_tl_object_as(message_ptr); if (message->file_->get_id() == telegram_api::encryptedFile::ID) { auto file = move_tl_object_as(message->file_); @@ -284,11 +214,6 @@ void SecretChatsManager::on_update_message(tl_object_ptr SecretChatsManager::add_qts(int32 qts) { - auto id = qts_manager_.add_pts(qts); - return PromiseCreator::event(self_closure(this, &SecretChatsManager::on_qts_ack, id)); -} - void SecretChatsManager::replay_binlog_event(BinlogEvent &&binlog_event) { if (dummy_mode_) { binlog_erase(G()->td_db()->get_binlog(), binlog_event.id_); @@ -357,11 +282,6 @@ void SecretChatsManager::replay_outbound_message(unique_ptrtd(), &Td::force_get_difference); -} - ActorId SecretChatsManager::get_chat_actor(int32 id) { return create_chat_actor_impl(id, false); } @@ -501,19 +421,6 @@ ActorId SecretChatsManager::create_chat_actor_impl(int32 id, bo } } -void SecretChatsManager::on_qts_ack(PtsManager::PtsId qts_ack_token) { - auto old_qts = qts_manager_.db_pts(); - auto new_qts = qts_manager_.finish(qts_ack_token); - if (old_qts != new_qts) { - save_qts(); - } -} - -void SecretChatsManager::save_qts() { - LOG(INFO) << "Save " << tag("qts", qts_manager_.db_pts()); - send_closure(G()->td(), &Td::update_qts, qts_manager_.db_pts()); -} - void SecretChatsManager::hangup() { close_flag_ = true; if (dummy_mode_) { diff --git a/td/telegram/SecretChatsManager.h b/td/telegram/SecretChatsManager.h index 904cdde1b..2e052fadb 100644 --- a/td/telegram/SecretChatsManager.h +++ b/td/telegram/SecretChatsManager.h @@ -29,16 +29,11 @@ struct BinlogEvent; class SecretChatsManager : public Actor { public: explicit SecretChatsManager(ActorShared<> parent); - void init_qts(int32 qts); - void update_qts(int32 qts); - // we can forget all pending_updates after start_get_difference they will be received after this point anyway - // It is not necessary, but it will help. - void before_get_difference(int32 qts); // Proxy query to corrensponding SecretChatActor. // Look for more info in SecretChatActor.h void on_update_chat(tl_object_ptr update); - void on_update_message(tl_object_ptr update, bool force_apply); + void on_new_message(tl_object_ptr &&message_ptr, Promise &&promise); void create_chat(int32 user_id, int64 user_access_hash, Promise promise); void cancel_chat(SecretChatId, Promise<> promise); @@ -60,13 +55,9 @@ class SecretChatsManager : public Actor { bool binlog_replay_finish_flag_ = false; bool dummy_mode_ = false; bool close_flag_ = false; - bool has_qts_ = false; ActorShared<> parent_; std::map> id_to_actor_; - PtsManager qts_manager_; - int32 last_get_difference_qts_ = -1; - bool is_online_{false}; std::vector>> pending_chat_updates_; @@ -83,10 +74,6 @@ class SecretChatsManager : public Actor { ActorId get_chat_actor(int32 id); ActorId create_chat_actor(int32 id); ActorId create_chat_actor_impl(int32 id, bool can_be_empty); - Promise<> add_qts(int32 qts); - void on_qts_ack(PtsManager::PtsId qts_ack_token); - void save_qts(); - void force_get_difference(); void start_up() override; void hangup() override; diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index 0b3668d08..b6b94f653 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -3520,14 +3520,6 @@ void Td::send(NetQueryPtr &&query) { G()->net_query_dispatcher().dispatch(std::move(query)); } -void Td::update_qts(int32 qts) { - if (close_flag_ > 1) { - return; - } - - updates_manager_->set_qts(qts); -} - void Td::force_get_difference() { if (close_flag_) { return; diff --git a/td/telegram/Td.h b/td/telegram/Td.h index c1ec3fef7..dc84aae5e 100644 --- a/td/telegram/Td.h +++ b/td/telegram/Td.h @@ -105,8 +105,6 @@ class Td final : public NetQueryCallback { void destroy(); void close(); - void update_qts(int32 qts); - void force_get_difference(); void schedule_get_terms_of_service(int32 expires_in); diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index 9066be3da..7085fe89e 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -227,9 +227,7 @@ void UpdatesManager::before_get_difference(bool is_initial) { send_closure(G()->state_manager(), &StateManager::on_synchronized, false); td_->messages_manager_->before_get_difference(); - if (!is_initial) { - send_closure(td_->secret_chats_manager_, &SecretChatsManager::before_get_difference, get_qts()); - } + send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference); } @@ -238,6 +236,11 @@ Promise<> UpdatesManager::add_pts(int32 pts) { return PromiseCreator::event(self_closure(this, &UpdatesManager::on_pts_ack, id)); } +Promise<> UpdatesManager::add_qts(int32 qts) { + auto id = qts_manager_.add_pts(qts); + return PromiseCreator::event(self_closure(this, &UpdatesManager::on_qts_ack, id)); +} + void UpdatesManager::on_pts_ack(PtsManager::PtsId ack_token) { auto old_pts = pts_manager_.db_pts(); auto new_pts = pts_manager_.finish(ack_token); @@ -246,6 +249,14 @@ void UpdatesManager::on_pts_ack(PtsManager::PtsId ack_token) { } } +void UpdatesManager::on_qts_ack(PtsManager::PtsId ack_token) { + auto old_qts = qts_manager_.db_pts(); + auto new_qts = qts_manager_.finish(ack_token); + if (old_qts != new_qts) { + save_qts(new_qts); + } +} + void UpdatesManager::save_pts(int32 pts) { if (pts == std::numeric_limits::max()) { G()->td_db()->get_binlog_pmc()->erase("updates.pts"); @@ -254,6 +265,12 @@ void UpdatesManager::save_pts(int32 pts) { } } +void UpdatesManager::save_qts(int32 qts) { + if (!G()->ignore_backgrond_updates()) { + G()->td_db()->get_binlog_pmc()->set("updates.qts", to_string(qts)); + } +} + Promise<> UpdatesManager::set_pts(int32 pts, const char *source) { if (pts == std::numeric_limits::max()) { LOG(WARNING) << "Update pts from " << get_pts() << " to -1 from " << source; @@ -281,17 +298,20 @@ Promise<> UpdatesManager::set_pts(int32 pts, const char *source) { return result; } -void UpdatesManager::set_qts(int32 qts) { - if (qts > qts_) { - LOG(INFO) << "Update qts to " << qts; - - qts_ = qts; - if (!G()->ignore_backgrond_updates()) { - G()->td_db()->get_binlog_pmc()->set("updates.qts", to_string(qts)); +Promise<> UpdatesManager::set_qts(int32 qts) { + Promise<> result; + if (qts > get_qts() || (0 < qts && qts < get_qts() - 399999)) { // qts can only go up or drop cardinally + if (qts < get_qts() - 399999) { + LOG(WARNING) << "Qts decreases from " << get_qts() << " to " << qts; + } else { + LOG(INFO) << "Update qts from " << get_qts() << " to " << qts; } - } else if (qts < qts_) { - LOG(ERROR) << "Receive wrong qts = " << qts << ". Current qts = " << qts_; + + result = add_qts(qts); + } else if (qts < get_qts()) { + LOG(ERROR) << "Receive wrong qts = " << qts << " less than current qts = " << get_qts(); } + return result; } void UpdatesManager::set_date(int32 date, bool from_update, string date_source) { @@ -791,7 +811,7 @@ void UpdatesManager::on_get_updates_state(tl_object_ptrpts_, full_source.c_str()).set_value(Unit()); set_date(state->date_, false, std::move(full_source)); - // set_qts(state->qts_); + // set_qts(state->qts_).set_value(Unit()); seq_ = state->seq_; } @@ -952,11 +972,10 @@ void UpdatesManager::init_state() { } pts_manager_.init(to_integer(pts_str)); last_get_difference_pts_ = get_pts(); - qts_ = to_integer(pmc->get("updates.qts")); + qts_manager_.init(to_integer(pmc->get("updates.qts"))); date_ = to_integer(pmc->get("updates.date")); date_source_ = "database"; - LOG(DEBUG) << "Init: " << get_pts() << " " << qts_ << " " << date_; - send_closure(td_->secret_chats_manager_, &SecretChatsManager::init_qts, qts_); + LOG(DEBUG) << "Init: " << get_pts() << " " << get_qts() << " " << date_; get_difference("init_state"); } @@ -1014,11 +1033,13 @@ void UpdatesManager::process_get_difference_updates( } for (auto &encrypted_message : new_encrypted_messages) { - on_update(make_tl_object(std::move(encrypted_message), 0), true); + send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(encrypted_message), + Promise()); } - send_closure(td_->secret_chats_manager_, &SecretChatsManager::update_qts, qts); process_updates(std::move(other_updates), true); + + set_qts(qts).set_value(Unit()); } void UpdatesManager::on_get_difference(tl_object_ptr &&difference_ptr) { @@ -1060,7 +1081,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr(difference_ptr); if (difference->intermediate_state_->pts_ >= get_pts() && get_pts() != std::numeric_limits::max() && - difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == qts_) { + difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == get_qts()) { // TODO send new getDifference request and apply difference slice only after that } @@ -1915,7 +1936,19 @@ void UpdatesManager::on_update(tl_object_ptr upd } void UpdatesManager::on_update(tl_object_ptr update, bool force_apply) { - send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_update_message, std::move(update), force_apply); + if (!force_apply) { + if (update->qts_ <= get_qts()) { + LOG(INFO) << "Ignore already processed update with qts " << update->qts_; + return; + } + if (update->qts_ != get_qts() + 1) { + // TODO fill gap + return; + } + } + + send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(update->message_), + add_qts(update->qts_)); } void UpdatesManager::on_update(tl_object_ptr update, bool /*force_apply*/) { diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index 60dfca28d..4185d8c7c 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -63,7 +63,7 @@ class UpdatesManager : public Actor { return pts_manager_.mem_pts(); } int32 get_qts() const { - return qts_; + return qts_manager_.mem_pts(); } int32 get_date() const { return date_; @@ -71,7 +71,7 @@ class UpdatesManager : public Actor { Promise<> set_pts(int32 pts, const char *source) TD_WARN_UNUSED_RESULT; - void set_qts(int32 qts); + Promise<> set_qts(int32 qts) TD_WARN_UNUSED_RESULT; static const double MAX_UNFILLED_GAP_TIME; @@ -102,7 +102,7 @@ class UpdatesManager : public Actor { ActorShared<> parent_; PtsManager pts_manager_; - int32 qts_ = 0; + PtsManager qts_manager_; int32 date_ = 0; int32 seq_ = 0; string date_source_ = "nowhere"; @@ -126,6 +126,10 @@ class UpdatesManager : public Actor { void on_pts_ack(PtsManager::PtsId ack_token); void save_pts(int32 pts); + Promise<> add_qts(int32 qts); + void on_qts_ack(PtsManager::PtsId ack_token); + void save_qts(int32 qts); + void set_date(int32 date, bool from_update, string date_source); int32 get_short_update_date() const; @@ -260,7 +264,7 @@ class UpdatesManager : public Actor { void on_update(tl_object_ptr update, bool /*force_apply*/); void on_update(tl_object_ptr update, bool /*force_apply*/); - void on_update(tl_object_ptr update, bool /*force_apply*/); + void on_update(tl_object_ptr update, bool force_apply); void on_update(tl_object_ptr update, bool /*force_apply*/); void on_update(tl_object_ptr update, bool /*force_apply*/); diff --git a/td/telegram/logevent/SecretChatEvent.h b/td/telegram/logevent/SecretChatEvent.h index 6c9e11c50..c01e86993 100644 --- a/td/telegram/logevent/SecretChatEvent.h +++ b/td/telegram/logevent/SecretChatEvent.h @@ -211,7 +211,7 @@ class InboundSecretMessage : public SecretChatLogEventBase int32 date = 0; BufferSlice encrypted_message; // empty when we store event to binlog - Promise qts_ack; + Promise promise; bool is_checked = false; // after decrypted and checked @@ -245,7 +245,7 @@ class InboundSecretMessage : public SecretChatLogEventBase store(chat_id, storer); store(date, storer); // skip encrypted_message - // skip qts_ack + // skip promise // TODO decrypted_message_layer->store(storer); @@ -278,7 +278,7 @@ class InboundSecretMessage : public SecretChatLogEventBase parse(chat_id, parser); parse(date, parser); // skip encrypted_message - // skip qts_ack + // skip promise // TODO decrypted_message_layer = secret_api::decryptedMessageLayer::fetch(parser); diff --git a/test/secret.cpp b/test/secret.cpp index 6bb068bce..1a7f94cd2 100644 --- a/test/secret.cpp +++ b/test/secret.cpp @@ -591,7 +591,7 @@ class Master : public Actor { event->chat_id = chat_id; event->date = 0; event->encrypted_message = std::move(data); - event->qts_ack = PromiseCreator::lambda( + event->promise = PromiseCreator::lambda( [actor_id = actor_id(this), chat_id, data = event->encrypted_message.copy(), crc](Result<> result) mutable { if (result.is_ok()) { LOG(INFO) << "FINISH add_inbound_message " << tag("crc", crc);