From 797156bf11cb0df57cce9fcde0e259cbbb494882 Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 27 Apr 2023 22:01:10 +0300 Subject: [PATCH] Fast PTS gap repair. --- td/telegram/UpdatesManager.cpp | 137 +++++++++++++++++++++++++++++++-- td/telegram/UpdatesManager.h | 8 ++ 2 files changed, 139 insertions(+), 6 deletions(-) diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index eadea0067..75f411a74 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -176,9 +176,7 @@ class GetDifferenceQuery final : public Td::ResultHandler { } void on_error(Status status) final { - if (!G()->is_expected_error(status)) { - promise_.set_error(std::move(status)); - } + promise_.set_error(std::move(status)); } }; @@ -205,6 +203,35 @@ class ConfirmPtsQtsQuery final : public Td::ResultHandler { } }; +class GetPtsUpdateQuery final : public Td::ResultHandler { + Promise> promise_; + + public: + explicit GetPtsUpdateQuery(Promise> &&promise) + : promise_(std::move(promise)) { + } + + void send(int32 pts) { + int32 flags = + telegram_api::updates_getDifference::PTS_LIMIT_MASK | telegram_api::updates_getDifference::QTS_LIMIT_MASK; + send_query(G()->net_query_creator().create( + telegram_api::updates_getDifference(flags, pts, 1, 0, std::numeric_limits::max(), 0, 0))); + } + + void on_result(BufferSlice packet) final { + auto result_ptr = fetch_result(packet); + if (result_ptr.is_error()) { + return on_error(result_ptr.move_as_error()); + } + + promise_.set_value(result_ptr.move_as_ok()); + } + + void on_error(Status status) final { + promise_.set_error(std::move(status)); + } +}; + UpdatesManager::UpdatesManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) { last_pts_save_time_ = last_qts_save_time_ = Time::now() - 2 * MAX_PTS_SAVE_DELAY; @@ -264,6 +291,33 @@ ActorShared UpdatesManager::create_reference() { return actor_shared(this, 1); } +void UpdatesManager::check_pts_gap(void *td) { + if (G()->close_flag()) { + return; + } + + CHECK(td != nullptr); + static_cast(td)->updates_manager_.get()->repair_pts_gap(); +} + +void UpdatesManager::repair_pts_gap() { + if (running_get_difference_ || !postponed_pts_updates_.empty()) { + return; + } + auto pts = get_pts() + 1; + if (pending_pts_updates_.empty() || pending_pts_updates_.begin()->first != pts + 1) { + return; + } + VLOG(get_difference) << "Fetch update with PTS = " << pts; + auto promise = + PromiseCreator::lambda([pts](Result> result) { + if (result.is_ok()) { + send_closure(G()->updates_manager(), &UpdatesManager::on_get_pts_update, pts, result.move_as_ok()); + } + }); + td_->create_handler(std::move(promise))->send(pts - 1); +} + void UpdatesManager::fill_pts_gap(void *td) { if (G()->close_flag()) { return; @@ -1717,8 +1771,7 @@ void UpdatesManager::on_get_difference(tl_object_ptrauth_manager_->is_authorized()) { - // just in case + if (G()->close_flag()) { return; } @@ -1839,6 +1892,70 @@ void UpdatesManager::on_get_difference(tl_object_ptr difference_ptr) { + if (G()->close_flag()) { + return; + } + if (get_pts() != pts - 1 || running_get_difference_ || !postponed_pts_updates_.empty() || + pending_pts_updates_.empty() || pending_pts_updates_.begin()->first != pts + 1) { + return; + } + + LOG(DEBUG) << "Receive update with PTS " << pts << ": " << to_string(difference_ptr); + + switch (difference_ptr->get_id()) { + case telegram_api::updates_differenceSlice::ID: { + auto difference = move_tl_object_as(difference_ptr); + + if (have_update_pts_changed(difference->other_updates_)) { + return; + } + + td_->contacts_manager_->on_get_users(std::move(difference->users_), "on_get_pts_update"); + td_->contacts_manager_->on_get_chats(std::move(difference->chats_), "on_get_pts_update"); + + for (auto &message : difference->new_messages_) { + difference->other_updates_.push_back( + telegram_api::make_object(std::move(message), pts, 1)); + } + telegram_api::object_ptr *update_ptr = nullptr; + size_t update_count = 0; + for (auto &update : difference->other_updates_) { + auto constructor_id = update->get_id(); + if (constructor_id == telegram_api::updateMessageID::ID) { + // in getDifference updateMessageID can't be received for scheduled messages + LOG(INFO) << "Receive update about sent message " << to_string(update); + auto update_message_id = move_tl_object_as(update); + td_->messages_manager_->on_update_message_id( + update_message_id->random_id_, MessageId(ServerMessageId(update_message_id->id_)), "on_get_pts_update"); + continue; + } + update_ptr = &update; + update_count++; + } + + if (!difference->new_encrypted_messages_.empty() || update_count != 1) { + LOG(ERROR) << "Receive unexpected updates with PTS " << pts << ": " << to_string(difference_ptr); + break; + } + + CHECK(update_ptr != nullptr); + LOG(WARNING) << "Repair update with PTS " << pts; + add_pending_pts_update(std::move(*update_ptr), pts, 1, Time::now(), Promise(), "on_get_pts_update"); + break; + } + case telegram_api::updates_differenceEmpty::ID: + case telegram_api::updates_difference::ID: + case telegram_api::updates_differenceTooLong::ID: { + LOG(ERROR) << "Receive " << to_string(difference_ptr); + break; + default: + UNREACHABLE(); + } + } +} + void UpdatesManager::confirm_pts_qts(int32 qts) { int32 pts = get_pts(); if (pts < 0) { @@ -2561,7 +2678,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr return promise.set_value(Unit()); } - if (DROP_PTS_UPDATES) { + if (DROP_PTS_UPDATES && Slice(source) != Slice("on_get_pts_update")) { set_pts_gap_timeout(1.0); return promise.set_value(Unit()); } @@ -2785,6 +2902,7 @@ void UpdatesManager::process_all_pending_pts_updates() { void UpdatesManager::drop_all_pending_pts_updates() { accumulated_pts_count_ = 0; accumulated_pts_ = -1; + min_pts_gap_timeout_.cancel_timeout(); pts_gap_timeout_.cancel_timeout(); pending_pts_updates_.clear(); } @@ -2906,6 +3024,7 @@ void UpdatesManager::process_pending_pts_updates() { pending_pts_updates_.erase(update_it); } if (applied_update_count > 0) { + min_pts_gap_timeout_.cancel_timeout(); pts_gap_timeout_.cancel_timeout(); } if (!pending_pts_updates_.empty()) { @@ -3042,6 +3161,12 @@ void UpdatesManager::process_pending_qts_updates() { void UpdatesManager::set_pts_gap_timeout(double timeout) { if (!pts_gap_timeout_.has_timeout() || timeout < pts_gap_timeout_.get_timeout()) { + if (timeout > 2 * MIN_UNFILLED_GAP_TIME) { + min_pts_gap_timeout_.set_callback(std::move(check_pts_gap)); + min_pts_gap_timeout_.set_callback_data(static_cast(td_)); + min_pts_gap_timeout_.set_timeout_in(MIN_UNFILLED_GAP_TIME); + } + pts_gap_timeout_.set_callback(std::move(fill_pts_gap)); pts_gap_timeout_.set_callback_data(static_cast(td_)); pts_gap_timeout_.set_timeout_in(timeout); diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index 64c9446d1..74fece442 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -148,6 +148,7 @@ class UpdatesManager final : public Actor { private: static constexpr int32 FORCED_GET_DIFFERENCE_PTS_DIFF = 100000; static constexpr int32 GAP_TIMEOUT_UPDATE_COUNT = 20; + static constexpr double MIN_UNFILLED_GAP_TIME = 0.1; static constexpr double MAX_UNFILLED_GAP_TIME = 0.7; static constexpr double MAX_PTS_SAVE_DELAY = 0.05; static constexpr double UPDATE_APPLY_WARNING_TIME = 0.25; @@ -234,6 +235,7 @@ class UpdatesManager final : public Actor { std::map pending_qts_updates_; // updates with too big QTS + Timeout min_pts_gap_timeout_; Timeout pts_gap_timeout_; Timeout seq_gap_timeout_; @@ -350,6 +352,8 @@ class UpdatesManager final : public Actor { void process_pending_qts_updates(); + static void check_pts_gap(void *td); + static void fill_pts_gap(void *td); static void fill_seq_gap(void *td); @@ -362,6 +366,10 @@ class UpdatesManager final : public Actor { static void on_pending_audio_transcription_timeout_callback(void *td, int64 transcription_id); + void repair_pts_gap(); + + void on_get_pts_update(int32 pts, telegram_api::object_ptr difference_ptr); + void set_pts_gap_timeout(double timeout); void set_seq_gap_timeout(double timeout);