Fast PTS gap repair.

This commit is contained in:
levlam 2023-04-27 22:01:10 +03:00
parent b34955e7a9
commit 797156bf11
2 changed files with 139 additions and 6 deletions

View File

@ -176,9 +176,7 @@ class GetDifferenceQuery final : public Td::ResultHandler {
}
void on_error(Status status) final {
if (!G()->is_expected_error(status)) {
promise_.set_error(std::move(status));
}
promise_.set_error(std::move(status));
}
};
@ -205,6 +203,35 @@ class ConfirmPtsQtsQuery final : public Td::ResultHandler {
}
};
class GetPtsUpdateQuery final : public Td::ResultHandler {
Promise<tl_object_ptr<telegram_api::updates_Difference>> promise_;
public:
explicit GetPtsUpdateQuery(Promise<tl_object_ptr<telegram_api::updates_Difference>> &&promise)
: promise_(std::move(promise)) {
}
void send(int32 pts) {
int32 flags =
telegram_api::updates_getDifference::PTS_LIMIT_MASK | telegram_api::updates_getDifference::QTS_LIMIT_MASK;
send_query(G()->net_query_creator().create(
telegram_api::updates_getDifference(flags, pts, 1, 0, std::numeric_limits<int32>::max(), 0, 0)));
}
void on_result(BufferSlice packet) final {
auto result_ptr = fetch_result<telegram_api::updates_getDifference>(packet);
if (result_ptr.is_error()) {
return on_error(result_ptr.move_as_error());
}
promise_.set_value(result_ptr.move_as_ok());
}
void on_error(Status status) final {
promise_.set_error(std::move(status));
}
};
UpdatesManager::UpdatesManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) {
last_pts_save_time_ = last_qts_save_time_ = Time::now() - 2 * MAX_PTS_SAVE_DELAY;
@ -264,6 +291,33 @@ ActorShared<UpdatesManager> UpdatesManager::create_reference() {
return actor_shared(this, 1);
}
void UpdatesManager::check_pts_gap(void *td) {
if (G()->close_flag()) {
return;
}
CHECK(td != nullptr);
static_cast<Td *>(td)->updates_manager_.get()->repair_pts_gap();
}
void UpdatesManager::repair_pts_gap() {
if (running_get_difference_ || !postponed_pts_updates_.empty()) {
return;
}
auto pts = get_pts() + 1;
if (pending_pts_updates_.empty() || pending_pts_updates_.begin()->first != pts + 1) {
return;
}
VLOG(get_difference) << "Fetch update with PTS = " << pts;
auto promise =
PromiseCreator::lambda([pts](Result<telegram_api::object_ptr<telegram_api::updates_Difference>> result) {
if (result.is_ok()) {
send_closure(G()->updates_manager(), &UpdatesManager::on_get_pts_update, pts, result.move_as_ok());
}
});
td_->create_handler<GetPtsUpdateQuery>(std::move(promise))->send(pts - 1);
}
void UpdatesManager::fill_pts_gap(void *td) {
if (G()->close_flag()) {
return;
@ -1717,8 +1771,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
VLOG(get_difference) << "----- END GET DIFFERENCE-----";
running_get_difference_ = false;
if (!td_->auth_manager_->is_authorized()) {
// just in case
if (G()->close_flag()) {
return;
}
@ -1839,6 +1892,70 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
}
}
void UpdatesManager::on_get_pts_update(int32 pts,
telegram_api::object_ptr<telegram_api::updates_Difference> difference_ptr) {
if (G()->close_flag()) {
return;
}
if (get_pts() != pts - 1 || running_get_difference_ || !postponed_pts_updates_.empty() ||
pending_pts_updates_.empty() || pending_pts_updates_.begin()->first != pts + 1) {
return;
}
LOG(DEBUG) << "Receive update with PTS " << pts << ": " << to_string(difference_ptr);
switch (difference_ptr->get_id()) {
case telegram_api::updates_differenceSlice::ID: {
auto difference = move_tl_object_as<telegram_api::updates_differenceSlice>(difference_ptr);
if (have_update_pts_changed(difference->other_updates_)) {
return;
}
td_->contacts_manager_->on_get_users(std::move(difference->users_), "on_get_pts_update");
td_->contacts_manager_->on_get_chats(std::move(difference->chats_), "on_get_pts_update");
for (auto &message : difference->new_messages_) {
difference->other_updates_.push_back(
telegram_api::make_object<telegram_api::updateNewMessage>(std::move(message), pts, 1));
}
telegram_api::object_ptr<telegram_api::Update> *update_ptr = nullptr;
size_t update_count = 0;
for (auto &update : difference->other_updates_) {
auto constructor_id = update->get_id();
if (constructor_id == telegram_api::updateMessageID::ID) {
// in getDifference updateMessageID can't be received for scheduled messages
LOG(INFO) << "Receive update about sent message " << to_string(update);
auto update_message_id = move_tl_object_as<telegram_api::updateMessageID>(update);
td_->messages_manager_->on_update_message_id(
update_message_id->random_id_, MessageId(ServerMessageId(update_message_id->id_)), "on_get_pts_update");
continue;
}
update_ptr = &update;
update_count++;
}
if (!difference->new_encrypted_messages_.empty() || update_count != 1) {
LOG(ERROR) << "Receive unexpected updates with PTS " << pts << ": " << to_string(difference_ptr);
break;
}
CHECK(update_ptr != nullptr);
LOG(WARNING) << "Repair update with PTS " << pts;
add_pending_pts_update(std::move(*update_ptr), pts, 1, Time::now(), Promise<Unit>(), "on_get_pts_update");
break;
}
case telegram_api::updates_differenceEmpty::ID:
case telegram_api::updates_difference::ID:
case telegram_api::updates_differenceTooLong::ID: {
LOG(ERROR) << "Receive " << to_string(difference_ptr);
break;
default:
UNREACHABLE();
}
}
}
void UpdatesManager::confirm_pts_qts(int32 qts) {
int32 pts = get_pts();
if (pts < 0) {
@ -2561,7 +2678,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
return promise.set_value(Unit());
}
if (DROP_PTS_UPDATES) {
if (DROP_PTS_UPDATES && Slice(source) != Slice("on_get_pts_update")) {
set_pts_gap_timeout(1.0);
return promise.set_value(Unit());
}
@ -2785,6 +2902,7 @@ void UpdatesManager::process_all_pending_pts_updates() {
void UpdatesManager::drop_all_pending_pts_updates() {
accumulated_pts_count_ = 0;
accumulated_pts_ = -1;
min_pts_gap_timeout_.cancel_timeout();
pts_gap_timeout_.cancel_timeout();
pending_pts_updates_.clear();
}
@ -2906,6 +3024,7 @@ void UpdatesManager::process_pending_pts_updates() {
pending_pts_updates_.erase(update_it);
}
if (applied_update_count > 0) {
min_pts_gap_timeout_.cancel_timeout();
pts_gap_timeout_.cancel_timeout();
}
if (!pending_pts_updates_.empty()) {
@ -3042,6 +3161,12 @@ void UpdatesManager::process_pending_qts_updates() {
void UpdatesManager::set_pts_gap_timeout(double timeout) {
if (!pts_gap_timeout_.has_timeout() || timeout < pts_gap_timeout_.get_timeout()) {
if (timeout > 2 * MIN_UNFILLED_GAP_TIME) {
min_pts_gap_timeout_.set_callback(std::move(check_pts_gap));
min_pts_gap_timeout_.set_callback_data(static_cast<void *>(td_));
min_pts_gap_timeout_.set_timeout_in(MIN_UNFILLED_GAP_TIME);
}
pts_gap_timeout_.set_callback(std::move(fill_pts_gap));
pts_gap_timeout_.set_callback_data(static_cast<void *>(td_));
pts_gap_timeout_.set_timeout_in(timeout);

View File

@ -148,6 +148,7 @@ class UpdatesManager final : public Actor {
private:
static constexpr int32 FORCED_GET_DIFFERENCE_PTS_DIFF = 100000;
static constexpr int32 GAP_TIMEOUT_UPDATE_COUNT = 20;
static constexpr double MIN_UNFILLED_GAP_TIME = 0.1;
static constexpr double MAX_UNFILLED_GAP_TIME = 0.7;
static constexpr double MAX_PTS_SAVE_DELAY = 0.05;
static constexpr double UPDATE_APPLY_WARNING_TIME = 0.25;
@ -234,6 +235,7 @@ class UpdatesManager final : public Actor {
std::map<int32, PendingQtsUpdate> pending_qts_updates_; // updates with too big QTS
Timeout min_pts_gap_timeout_;
Timeout pts_gap_timeout_;
Timeout seq_gap_timeout_;
@ -350,6 +352,8 @@ class UpdatesManager final : public Actor {
void process_pending_qts_updates();
static void check_pts_gap(void *td);
static void fill_pts_gap(void *td);
static void fill_seq_gap(void *td);
@ -362,6 +366,10 @@ class UpdatesManager final : public Actor {
static void on_pending_audio_transcription_timeout_callback(void *td, int64 transcription_id);
void repair_pts_gap();
void on_get_pts_update(int32 pts, telegram_api::object_ptr<telegram_api::updates_Difference> difference_ptr);
void set_pts_gap_timeout(double timeout);
void set_seq_gap_timeout(double timeout);