From d29d508b846465dfcb681dcc6b3fed2ee5f41502 Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 23 Jun 2022 22:00:48 +0300 Subject: [PATCH] Avoid MultiPromise usage if there is only one update to process. --- td/telegram/UpdatesManager.cpp | 86 +++++++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 21 deletions(-) diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index 28adf34e7..61138ceff 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -63,6 +63,7 @@ #include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/Random.h" +#include "td/utils/ScopeGuard.h" #include "td/utils/Slice.h" #include "td/utils/SliceBuilder.h" #include "td/utils/Status.h" @@ -1826,11 +1827,29 @@ void UpdatesManager::on_pending_updates(vector &&result) mutable { - send_closure(actor_id, &UpdatesManager::on_pending_updates_processed, std::move(result), std::move(promise)); - }); - auto lock = mpas.get_promise(); + Promise lock; + auto use_mpas = need_postpone || update_count != 1; + auto get_promise = [&] { + if (use_mpas) { + return mpas.get_promise(); + } else { + CHECK(update_count != 0); + update_count--; + return std::move(promise); + } + }; + if (use_mpas) { + being_processed_updates_++; + mpas.add_promise([actor_id = create_reference(), promise = std::move(promise)](Result &&result) mutable { + send_closure(actor_id, &UpdatesManager::on_pending_updates_processed, std::move(result), std::move(promise)); + }); + lock = get_promise(); + } + SCOPE_EXIT { + if (!use_mpas && update_count == 1) { + promise.set_value(Unit()); + } + }; for (auto &update : updates) { if (update != nullptr) { @@ -1853,11 +1872,11 @@ void UpdatesManager::on_pending_updates(vector(update), mpas.get_promise()); + on_update(move_tl_object_as(update), get_promise()); update = nullptr; } if (id == telegram_api::updateEncryption::ID) { - on_update(move_tl_object_as(update), mpas.get_promise()); + on_update(move_tl_object_as(update), get_promise()); update = nullptr; } CHECK(need_postpone || !running_get_difference_); @@ -1873,7 +1892,7 @@ void UpdatesManager::on_pending_updates(vector> Promise &&promise) { tl_object_ptr update_pts_changed; + int32 update_count = 0; + for (auto &update : updates) { + if (update != nullptr) { + update_count++; + } + } + MultiPromiseActorSafe mpas{"OnProcessUpdatesMultiPromiseActor"}; - mpas.add_promise(std::move(promise)); - auto lock = mpas.get_promise(); + Promise lock; + auto use_mpas = update_count != 1; + auto get_promise = [&] { + if (use_mpas) { + return mpas.get_promise(); + } else { + CHECK(update_count != 0); + update_count--; + return std::move(promise); + } + }; + if (use_mpas) { + mpas.add_promise(std::move(promise)); + lock = get_promise(); + } + SCOPE_EXIT { + if (!use_mpas && update_count == 1) { + promise.set_value(Unit()); + } + }; /* for (auto &update : updates) { @@ -2013,7 +2057,7 @@ void UpdatesManager::process_updates(vector> // process updateReadChannelInbox before updateNewChannelMessage auto constructor_id = update->get_id(); if (constructor_id == telegram_api::updateReadChannelInbox::ID) { - on_update(move_tl_object_as(update), mpas.get_promise()); + on_update(move_tl_object_as(update), get_promise()); } } } @@ -2023,19 +2067,19 @@ void UpdatesManager::process_updates(vector> // process updateNewChannelMessage first auto constructor_id = update->get_id(); if (constructor_id == telegram_api::updateNewChannelMessage::ID) { - on_update(move_tl_object_as(update), mpas.get_promise()); + on_update(move_tl_object_as(update), get_promise()); continue; } // process updateNewScheduledMessage first if (constructor_id == telegram_api::updateNewScheduledMessage::ID) { - on_update(move_tl_object_as(update), mpas.get_promise()); + on_update(move_tl_object_as(update), get_promise()); continue; } // updateGroupCallConnection must be processed before updateGroupCall if (constructor_id == telegram_api::updateGroupCallConnection::ID) { - on_update(move_tl_object_as(update), mpas.get_promise()); + on_update(move_tl_object_as(update), get_promise()); continue; } @@ -2065,7 +2109,7 @@ void UpdatesManager::process_updates(vector> process_pts_update(std::move(update)); } else if (is_qts_update(update.get())) { - process_qts_update(std::move(update), 0, mpas.get_promise()); + process_qts_update(std::move(update), 0, get_promise()); } else if (update->get_id() == telegram_api::updateChannelTooLong::ID) { td_->messages_manager_->on_update_channel_too_long( move_tl_object_as(update), true); @@ -2076,12 +2120,12 @@ void UpdatesManager::process_updates(vector> for (auto &update : updates) { if (update != nullptr) { LOG(INFO) << "Process update " << to_string(update); - downcast_call(*update, OnUpdate(this, update, mpas.get_promise())); + downcast_call(*update, OnUpdate(this, update, get_promise())); CHECK(!running_get_difference_); } } if (update_pts_changed != nullptr) { - on_update(std::move(update_pts_changed), mpas.get_promise()); + on_update(std::move(update_pts_changed), get_promise()); } lock.set_value(Unit()); }