diff --git a/CMakeLists.txt b/CMakeLists.txt index 1f80bada6..1be7afdfc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -452,10 +452,10 @@ set(TDLIB_SOURCE td/telegram/net/MtprotoHeader.cpp td/telegram/net/NetActor.cpp td/telegram/net/NetQuery.cpp - td/telegram/net/NetQueryCounter.cpp td/telegram/net/NetQueryCreator.cpp td/telegram/net/NetQueryDelayer.cpp td/telegram/net/NetQueryDispatcher.cpp + td/telegram/net/NetQueryStats.cpp td/telegram/net/NetStatsManager.cpp td/telegram/net/Proxy.cpp td/telegram/net/PublicRsaKeyShared.cpp @@ -628,6 +628,7 @@ set(TDLIB_SOURCE td/telegram/net/NetQueryCreator.h td/telegram/net/NetQueryDelayer.h td/telegram/net/NetQueryDispatcher.h + td/telegram/net/NetQueryStats.h td/telegram/net/NetStatsManager.h td/telegram/net/NetType.h td/telegram/net/Proxy.h diff --git a/td/generate/scheme/telegram_api.tl b/td/generate/scheme/telegram_api.tl index 0bd9e6749..139f6cdb0 100644 --- a/td/generate/scheme/telegram_api.tl +++ b/td/generate/scheme/telegram_api.tl @@ -96,7 +96,7 @@ storage.fileMp4#b3cea0e4 = storage.FileType; storage.fileWebp#1081464c = storage.FileType; userEmpty#200250ba id:int = User; -user#938458c1 flags:# self:flags.10?true contact:flags.11?true mutual_contact:flags.12?true deleted:flags.13?true bot:flags.14?true bot_chat_history:flags.15?true bot_nochats:flags.16?true verified:flags.17?true restricted:flags.18?true min:flags.20?true bot_inline_geo:flags.21?true support:flags.23?true scam:flags.24?true id:int access_hash:flags.0?long first_name:flags.1?string last_name:flags.2?string username:flags.3?string phone:flags.4?string photo:flags.5?UserProfilePhoto status:flags.6?UserStatus bot_info_version:flags.14?int restriction_reason:flags.18?Vector bot_inline_placeholder:flags.19?string lang_code:flags.22?string = User; +user#938458c1 flags:# self:flags.10?true contact:flags.11?true mutual_contact:flags.12?true deleted:flags.13?true bot:flags.14?true bot_chat_history:flags.15?true bot_nochats:flags.16?true verified:flags.17?true restricted:flags.18?true min:flags.20?true bot_inline_geo:flags.21?true support:flags.23?true scam:flags.24?true apply_min_photo:flags.25?true id:int access_hash:flags.0?long first_name:flags.1?string last_name:flags.2?string username:flags.3?string phone:flags.4?string photo:flags.5?UserProfilePhoto status:flags.6?UserStatus bot_info_version:flags.14?int restriction_reason:flags.18?Vector bot_inline_placeholder:flags.19?string lang_code:flags.22?string = User; userProfilePhotoEmpty#4f11bae1 = UserProfilePhoto; userProfilePhoto#69d3ab26 flags:# has_video:flags.0?true photo_id:long photo_small:FileLocation photo_big:FileLocation dc_id:int = UserProfilePhoto; @@ -115,7 +115,7 @@ channel#d31a961e flags:# creator:flags.0?true left:flags.2?true broadcast:flags. channelForbidden#289da732 flags:# broadcast:flags.5?true megagroup:flags.8?true id:int access_hash:long title:string until_date:flags.16?int = Chat; chatFull#1b7c9db3 flags:# can_set_username:flags.7?true has_scheduled:flags.8?true id:int about:string participants:ChatParticipants chat_photo:flags.2?Photo notify_settings:PeerNotifySettings exported_invite:ExportedChatInvite bot_info:flags.3?Vector pinned_msg_id:flags.6?int folder_id:flags.11?int = ChatFull; -channelFull#f0e6672a flags:# can_view_participants:flags.3?true can_set_username:flags.6?true can_set_stickers:flags.7?true hidden_prehistory:flags.10?true can_view_stats:flags.12?true can_set_location:flags.16?true has_scheduled:flags.19?true id:int about:string participants_count:flags.0?int admins_count:flags.1?int kicked_count:flags.2?int banned_count:flags.2?int online_count:flags.13?int read_inbox_max_id:int read_outbox_max_id:int unread_count:int chat_photo:Photo notify_settings:PeerNotifySettings exported_invite:ExportedChatInvite bot_info:Vector migrated_from_chat_id:flags.4?int migrated_from_max_id:flags.4?int pinned_msg_id:flags.5?int stickerset:flags.8?StickerSet available_min_id:flags.9?int folder_id:flags.11?int linked_chat_id:flags.14?int location:flags.15?ChannelLocation slowmode_seconds:flags.17?int slowmode_next_send_date:flags.18?int stats_dc:flags.12?int pts:int = ChatFull; +channelFull#f0e6672a flags:# can_view_participants:flags.3?true can_set_username:flags.6?true can_set_stickers:flags.7?true hidden_prehistory:flags.10?true can_set_location:flags.16?true has_scheduled:flags.19?true can_view_stats:flags.20?true id:int about:string participants_count:flags.0?int admins_count:flags.1?int kicked_count:flags.2?int banned_count:flags.2?int online_count:flags.13?int read_inbox_max_id:int read_outbox_max_id:int unread_count:int chat_photo:Photo notify_settings:PeerNotifySettings exported_invite:ExportedChatInvite bot_info:Vector migrated_from_chat_id:flags.4?int migrated_from_max_id:flags.4?int pinned_msg_id:flags.5?int stickerset:flags.8?StickerSet available_min_id:flags.9?int folder_id:flags.11?int linked_chat_id:flags.14?int location:flags.15?ChannelLocation slowmode_seconds:flags.17?int slowmode_next_send_date:flags.18?int stats_dc:flags.12?int pts:int = ChatFull; chatParticipant#c8d7493e user_id:int inviter_id:int date:int = ChatParticipant; chatParticipantCreator#da13538a user_id:int = ChatParticipant; diff --git a/td/generate/scheme/telegram_api.tlo b/td/generate/scheme/telegram_api.tlo index 20968326a..bd63deac4 100644 Binary files a/td/generate/scheme/telegram_api.tlo and b/td/generate/scheme/telegram_api.tlo differ diff --git a/td/telegram/Client.cpp b/td/telegram/Client.cpp index b0fb29be7..8f7fc03ef 100644 --- a/td/telegram/Client.cpp +++ b/td/telegram/Client.cpp @@ -7,6 +7,7 @@ #include "td/telegram/Client.h" #include "td/telegram/Td.h" +#include "td/telegram/TdCallback.h" #include "td/actor/actor.h" @@ -15,104 +16,22 @@ #include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/MpscPollableQueue.h" +#include "td/utils/port/RwMutex.h" #include "td/utils/port/thread.h" #include #include -#include #include #include +#include #include namespace td { -#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED - -class Client::Impl final { - public: - Impl() { - concurrent_scheduler_ = make_unique(); - concurrent_scheduler_->init(0); - class Callback : public TdCallback { - public: - explicit Callback(Impl *client) : client_(client) { - } - void on_result(std::uint64_t id, td_api::object_ptr result) override { - client_->responses_.push_back({id, std::move(result)}); - } - void on_error(std::uint64_t id, td_api::object_ptr error) override { - client_->responses_.push_back({id, std::move(error)}); - } - - Callback(const Callback &) = delete; - Callback &operator=(const Callback &) = delete; - Callback(Callback &&) = delete; - Callback &operator=(Callback &&) = delete; - ~Callback() override { - client_->closed_ = true; - Scheduler::instance()->yield(); - } - - private: - Impl *client_; - }; - td_ = concurrent_scheduler_->create_actor_unsafe(0, "Td", make_unique(this)); - concurrent_scheduler_->start(); - } - - void send(Request request) { - requests_.push_back(std::move(request)); - } - - Response receive(double timeout) { - if (!requests_.empty()) { - auto guard = concurrent_scheduler_->get_main_guard(); - for (auto &request : requests_) { - send_closure_later(td_, &Td::request, request.id, std::move(request.function)); - } - requests_.clear(); - } - - if (responses_.empty()) { - concurrent_scheduler_->run_main(0); - } else { - ConcurrentScheduler::emscripten_clear_main_timeout(); - } - if (!responses_.empty()) { - auto result = std::move(responses_.front()); - responses_.pop_front(); - return result; - } - return {0, nullptr}; - } - - Impl(const Impl &) = delete; - Impl &operator=(const Impl &) = delete; - Impl(Impl &&) = delete; - Impl &operator=(Impl &&) = delete; - ~Impl() { - { - auto guard = concurrent_scheduler_->get_main_guard(); - td_.reset(); - } - while (!closed_) { - concurrent_scheduler_->run_main(0); - } - concurrent_scheduler_.reset(); - } - - private: - std::deque responses_; - std::vector requests_; - unique_ptr concurrent_scheduler_; - ActorOwn td_; - bool closed_ = false; -}; - -#else - class MultiTd : public Actor { public: + explicit MultiTd(Td::Options options) : options_(std::move(options)) { + } void create(int32 td_id, unique_ptr callback) { auto &td = tds_[td_id]; CHECK(td.empty()); @@ -120,61 +39,265 @@ class MultiTd : public Actor { string name = "Td"; class TdActorContext : public ActorContext { public: - explicit TdActorContext(std::string tag) : tag_(std::move(tag)) { + explicit TdActorContext(string tag) : tag_(std::move(tag)) { } int32 get_id() const override { return 0x172ae58d; } - std::string tag_; + string tag_; }; auto context = std::make_shared(to_string(td_id)); auto old_context = set_context(context); auto old_tag = set_tag(context->tag_); - td = create_actor("Td", std::move(callback)); + td = create_actor("Td", std::move(callback), options_); set_context(old_context); set_tag(old_tag); } - void send(int32 td_id, Client::Request request) { - auto &td = tds_[td_id]; + + void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) { + auto &td = tds_[client_id]; CHECK(!td.empty()); - send_closure(td, &Td::request, request.id, std::move(request.function)); + send_closure(td, &Td::request, request_id, std::move(function)); } + void destroy(int32 td_id) { auto size = tds_.erase(td_id); CHECK(size == 1); } private: - std::unordered_map > tds_; + Td::Options options_; + std::unordered_map> tds_; +}; + +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED +class TdReceiver { + public: + MultiClient::Response receive(double timeout) { + if (!responses_.empty()) { + auto result = std::move(responses_.front()); + responses_.pop_front(); + return result; + } + return {0, 0, nullptr}; + } + + unique_ptr create_callback(MultiClient::ClientId client_id) { + class Callback : public TdCallback { + public: + Callback(MultiClient::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) { + } + void on_result(uint64 id, td_api::object_ptr result) override { + impl_->responses_.push_back({client_id_, id, std::move(result)}); + } + void on_error(uint64 id, td_api::object_ptr error) override { + impl_->responses_.push_back({client_id_, id, std::move(error)}); + } + Callback(const Callback &) = delete; + Callback &operator=(const Callback &) = delete; + Callback(Callback &&) = delete; + Callback &operator=(Callback &&) = delete; + ~Callback() override { + impl_->responses_.push_back({client_id_, 0, nullptr}); + } + + private: + MultiClient::ClientId client_id_; + TdReceiver *impl_; + }; + return td::make_unique(client_id, this); + } + + private: + std::queue responses_; +}; + +class MultiClient::Impl final { + public: + Impl() { + options_.net_query_stats = std::make_shared(); + concurrent_scheduler_ = make_unique(); + concurrent_scheduler_->init(0); + receiver_ = make_unique(); + concurrent_scheduler_->start(); + } + + ClientId create_client() { + auto client_id = ++client_id_; + tds_[client_id] = + concurrent_scheduler_->create_actor_unsafe(0, "Td", receiver_->create_callback(client_id), options_); + return client_id; + } + + void send(ClientId client_id, RequestId request_id, Function function) { + Request request; + request.client_id = client_id; + request.id = request_id; + request.function = std::move(function); + requests_.push_back(std::move(request)); + } + + Response receive(double timeout) { + if (!requests_.empty()) { + auto guard = concurrent_scheduler_->get_main_guard(); + for (auto &request : requests_) { + auto &td = tds_[request.client_id]; + CHECK(!td.empty()); + send_closure_later(td, &Td::request, request.id, std::move(request.function)); + } + requests_.clear(); + } + + auto response = receiver_->receive(0); + if (response.client_id == 0) { + concurrent_scheduler_->run_main(0); + response = receiver_->receive(0); + } else { + ConcurrentScheduler::emscripten_clear_main_timeout(); + } + if (response.client_id != 0 && !response.object) { + auto guard = concurrent_scheduler_->get_main_guard(); + tds_.erase(response.client_id); + } + return response; + } + + Impl() = default; + Impl(const Impl &) = delete; + Impl &operator=(const Impl &) = delete; + Impl(Impl &&) = delete; + Impl &operator=(Impl &&) = delete; + ~Impl() { + { + auto guard = concurrent_scheduler_->get_main_guard(); + for (auto &td : tds_) { + td.second = {}; + } + } + while (!tds_.empty()) { + receive(10); + } + concurrent_scheduler_->finish(); + } + + private: + unique_ptr receiver_; + struct Request { + ClientId client_id; + RequestId id; + Function function; + }; + std::vector requests_; + unique_ptr concurrent_scheduler_; + ClientId client_id_{0}; + Td::Options options_; + std::unordered_map> tds_; +}; + +class Client::Impl final { + public: + Impl() { + client_id_ = impl_.create_client(); + } + + void send(Request request) { + impl_.send(client_id_, request.id, std::move(request.function)); + } + + Response receive(double timeout) { + auto response = impl_.receive(timeout); + Response old_response; + old_response.id = response.id; + old_response.object = std::move(response.object); + return old_response; + } + + private: + MultiClient::Impl impl_; + MultiClient::ClientId client_id_; +}; + +#else + +class TdReceiver { + public: + TdReceiver() { + output_queue_ = std::make_shared(); + output_queue_->init(); + } + + MultiClient::Response receive(double timeout) { + VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout; + auto is_locked = receive_lock_.exchange(true); + CHECK(!is_locked); + auto response = receive_unlocked(timeout); + is_locked = receive_lock_.exchange(false); + CHECK(is_locked); + VLOG(td_requests) << "End to wait for updates, returning object " << response.id << ' ' << response.object.get(); + return response; + } + + unique_ptr create_callback(MultiClient::ClientId client_id) { + class Callback : public TdCallback { + public: + explicit Callback(MultiClient::ClientId client_id, std::shared_ptr output_queue) + : client_id_(client_id), output_queue_(std::move(output_queue)) { + } + void on_result(uint64 id, td_api::object_ptr result) override { + output_queue_->writer_put({client_id_, id, std::move(result)}); + } + void on_error(uint64 id, td_api::object_ptr error) override { + output_queue_->writer_put({client_id_, id, std::move(error)}); + } + Callback(const Callback &) = delete; + Callback &operator=(const Callback &) = delete; + Callback(Callback &&) = delete; + Callback &operator=(Callback &&) = delete; + ~Callback() override { + output_queue_->writer_put({client_id_, 0, nullptr}); + } + + private: + MultiClient::ClientId client_id_; + std::shared_ptr output_queue_; + }; + return td::make_unique(client_id, output_queue_); + } + + private: + using OutputQueue = MpscPollableQueue; + std::shared_ptr output_queue_; + int output_queue_ready_cnt_{0}; + std::atomic receive_lock_{false}; + + MultiClient::Response receive_unlocked(double timeout) { + if (output_queue_ready_cnt_ == 0) { + output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock(); + } + if (output_queue_ready_cnt_ > 0) { + output_queue_ready_cnt_--; + return output_queue_->reader_get_unsafe(); + } + if (timeout != 0) { + output_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); + return receive_unlocked(0); + } + return {0, 0, nullptr}; + } }; class MultiImpl { public: - static std::shared_ptr get() { - static std::mutex mutex; - static std::vector > impls; - std::unique_lock lock(mutex); - if (impls.size() == 0) { - impls.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4); - } - auto &impl = *std::min_element(impls.begin(), impls.end(), - [](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); }); - auto res = impl.lock(); - if (!res) { - res = std::make_shared(); - impl = res; - } - return res; - } - - MultiImpl() { + explicit MultiImpl(std::shared_ptr net_query_stats) { concurrent_scheduler_ = std::make_shared(); concurrent_scheduler_->init(3); concurrent_scheduler_->start(); { auto guard = concurrent_scheduler_->get_main_guard(); - multi_td_ = create_actor("MultiTd"); + Td::Options options; + options.net_query_stats = std::move(net_query_stats); + multi_td_ = create_actor("MultiTd", std::move(options)); } scheduler_thread_ = thread([concurrent_scheduler = concurrent_scheduler_] { @@ -187,19 +310,15 @@ class MultiImpl { MultiImpl(MultiImpl &&) = delete; MultiImpl &operator=(MultiImpl &&) = delete; - int32 create_id() { - static std::atomic id_{0}; - return id_.fetch_add(1) + 1; + int32 create(TdReceiver &receiver) { + auto id = create_id(); + create(id, receiver.create_callback(id)); + return id; } - void create(int32 td_id, unique_ptr callback) { + void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) { auto guard = concurrent_scheduler_->get_send_guard(); - send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback)); - } - - void send(int32 td_id, Client::Request request) { - auto guard = concurrent_scheduler_->get_send_guard(); - send_closure(multi_td_, &MultiTd::send, td_id, std::move(request)); + send_closure(multi_td_, &MultiTd::send, client_id, request_id, std::move(function)); } void destroy(int32 td_id) { @@ -221,40 +340,99 @@ class MultiImpl { std::shared_ptr concurrent_scheduler_; thread scheduler_thread_; ActorOwn multi_td_; + + static int32 create_id() { + static std::atomic current_id{1}; + return current_id.fetch_add(1); + } + + void create(int32 td_id, unique_ptr callback) { + auto guard = concurrent_scheduler_->get_send_guard(); + send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback)); + } +}; + +class MultiImplPool { + public: + std::shared_ptr get() { + std::unique_lock lock(mutex_); + if (impls_.empty()) { + init_openssl_threads(); + + impls_.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4); + } + auto &impl = *std::min_element(impls_.begin(), impls_.end(), + [](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); }); + auto res = impl.lock(); + if (!res) { + res = std::make_shared(net_query_stats_); + impl = res; + } + return res; + } + + private: + std::mutex mutex_; + std::vector> impls_; + std::shared_ptr net_query_stats_ = std::make_shared(); +}; + +class MultiClient::Impl final { + public: + ClientId create_client() { + auto impl = pool_.get(); + auto client_id = impl->create(*receiver_); + { + auto lock = impls_mutex_.lock_write().move_as_ok(); + impls_[client_id] = std::move(impl); + } + return client_id; + } + + void send(ClientId client_id, RequestId request_id, Function function) { + auto lock = impls_mutex_.lock_read().move_as_ok(); + auto it = impls_.find(client_id); + CHECK(it != impls_.end()); + it->second->send(client_id, request_id, std::move(function)); + } + + Response receive(double timeout) { + auto res = receiver_->receive(timeout); + if (res.client_id != 0 && !res.object) { + auto lock = impls_mutex_.lock_write().move_as_ok(); + impls_.erase(res.client_id); + } + return res; + } + + Impl() = default; + Impl(const Impl &) = delete; + Impl &operator=(const Impl &) = delete; + Impl(Impl &&) = delete; + Impl &operator=(Impl &&) = delete; + ~Impl() { + for (auto &it : impls_) { + it.second->destroy(it.first); + } + while (!impls_.empty()) { + receive(10); + } + } + + private: + MultiImplPool pool_; + RwMutex impls_mutex_; + std::unordered_map> impls_; + unique_ptr receiver_{make_unique()}; }; class Client::Impl final { public: - using OutputQueue = MpscPollableQueue; Impl() { - multi_impl_ = MultiImpl::get(); - td_id_ = multi_impl_->create_id(); - output_queue_ = std::make_shared(); - output_queue_->init(); - - class Callback : public TdCallback { - public: - explicit Callback(std::shared_ptr output_queue) : output_queue_(std::move(output_queue)) { - } - void on_result(std::uint64_t id, td_api::object_ptr result) override { - output_queue_->writer_put({id, std::move(result)}); - } - void on_error(std::uint64_t id, td_api::object_ptr error) override { - output_queue_->writer_put({id, std::move(error)}); - } - Callback(const Callback &) = delete; - Callback &operator=(const Callback &) = delete; - Callback(Callback &&) = delete; - Callback &operator=(Callback &&) = delete; - ~Callback() override { - output_queue_->writer_put({0, nullptr}); - } - - private: - std::shared_ptr output_queue_; - }; - - multi_impl_->create(td_id_, td::make_unique(output_queue_)); + static MultiImplPool pool; + multi_impl_ = pool.get(); + receiver_ = make_unique(); + td_id_ = multi_impl_->create(*receiver_); } void send(Client::Request request) { @@ -263,18 +441,20 @@ class Client::Impl final { return; } - multi_impl_->send(td_id_, std::move(request)); + multi_impl_->send(td_id_, request.id, std::move(request.function)); } Client::Response receive(double timeout) { - VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout; - auto is_locked = receive_lock_.exchange(true); - CHECK(!is_locked); - auto response = receive_unlocked(timeout); - is_locked = receive_lock_.exchange(false); - CHECK(is_locked); - VLOG(td_requests) << "End to wait for updates, returning object " << response.id << ' ' << response.object.get(); - return response; + auto res = receiver_->receive(timeout); + + if (res.client_id != 0 && !res.object) { + is_closed_ = true; + } + + Client::Response old_res; + old_res.id = res.id; + old_res.object = std::move(res.object); + return old_res; } Impl(const Impl &) = delete; @@ -290,37 +470,14 @@ class Client::Impl final { private: std::shared_ptr multi_impl_; + unique_ptr receiver_; - std::shared_ptr output_queue_; - int output_queue_ready_cnt_{0}; - std::atomic receive_lock_{false}; bool is_closed_{false}; int32 td_id_; - - Client::Response receive_unlocked(double timeout) { - if (output_queue_ready_cnt_ == 0) { - output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock(); - } - if (output_queue_ready_cnt_ > 0) { - output_queue_ready_cnt_--; - auto res = output_queue_->reader_get_unsafe(); - if (res.object == nullptr && res.id == 0) { - is_closed_ = true; - } - return res; - } - if (timeout != 0) { - output_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); - return receive_unlocked(0); - } - return {0, nullptr}; - } }; #endif Client::Client() : impl_(std::make_unique()) { - // At least it should be enough for everybody who uses TDLib - init_openssl_threads(); } void Client::send(Request &&request) { @@ -342,4 +499,27 @@ Client::~Client() = default; Client::Client(Client &&other) = default; Client &Client::operator=(Client &&other) = default; +MultiClient::MultiClient() : impl_(std::make_unique()) { +} + +MultiClient::ClientId MultiClient::create_client() { + return impl_->create_client(); +} + +void MultiClient::send(ClientId client_id, RequestId request_id, Function &&function) { + impl_->send(client_id, request_id, std::move(function)); +} + +MultiClient::Response MultiClient::receive(double timeout) { + return impl_->receive(timeout); +} + +MultiClient::Object MultiClient::execute(Function &&function) { + return Td::static_request(std::move(function)); +} + +MultiClient::~MultiClient() = default; +MultiClient::MultiClient(MultiClient &&other) = default; +MultiClient &MultiClient::operator=(MultiClient &&other) = default; + } // namespace td diff --git a/td/telegram/Client.h b/td/telegram/Client.h index ac49f1550..40f50613a 100644 --- a/td/telegram/Client.h +++ b/td/telegram/Client.h @@ -131,4 +131,38 @@ class Client final { std::unique_ptr impl_; }; +// --- EXPERIMENTAL --- +class MultiClient final { + public: + MultiClient(); + + using ClientId = std::int32_t; + using RequestId = std::uint64_t; + using Function = td_api::object_ptr; + using Object = td_api::object_ptr; + struct Response { + ClientId client_id; + RequestId id; + Object object; + }; + + ClientId create_client(); + + void send(ClientId client_id, RequestId request_id, Function &&function); + + Response receive(double timeout); + + static Object execute(Function &&function); + + ~MultiClient(); + + MultiClient(MultiClient &&other); + + MultiClient &operator=(MultiClient &&other); + + private: + class Impl; + std::unique_ptr impl_; +}; + } // namespace td diff --git a/td/telegram/ClientActor.cpp b/td/telegram/ClientActor.cpp index a53fd7e66..61932e486 100644 --- a/td/telegram/ClientActor.cpp +++ b/td/telegram/ClientActor.cpp @@ -9,12 +9,15 @@ #include "td/telegram/td_api.h" #include "td/telegram/net/NetQueryCounter.h" +#include "td/telegram/net/NetQueryStats.h" #include "td/telegram/Td.h" namespace td { -ClientActor::ClientActor(unique_ptr callback) { - td_ = create_actor("Td", std::move(callback)); +ClientActor::ClientActor(unique_ptr callback, Options options) { + Td::Options td_options; + td_options.net_query_stats = options.net_query_stats; + td_ = create_actor("Td", std::move(callback), std::move(td_options)); } void ClientActor::request(uint64 id, td_api::object_ptr request) { @@ -31,8 +34,16 @@ td_api::object_ptr ClientActor::execute(td_api::object_ptr create_net_query_stats() { + return std::make_shared(); +} + +void dump_pending_network_queries(NetQueryStats &stats) { + stats.dump_pending_network_queries(); +} + +uint64 get_pending_network_query_count(NetQueryStats &stats) { + return stats.get_count(); } } // namespace td diff --git a/td/telegram/ClientActor.h b/td/telegram/ClientActor.h index 54cb7250a..3a6fd5729 100644 --- a/td/telegram/ClientActor.h +++ b/td/telegram/ClientActor.h @@ -8,17 +8,20 @@ ///\file -#include "td/actor/actor.h" +#include "td/telegram/TdCallback.h" #include "td/telegram/td_api.h" #include "td/telegram/td_api.hpp" -#include "td/telegram/TdCallback.h" +#include "td/actor/actor.h" #include "td/utils/common.h" +#include + namespace td { +class NetQueryStats; class Td; /** @@ -27,11 +30,18 @@ class Td; */ class ClientActor : public Actor { public: + struct Options { + std::shared_ptr net_query_stats; + + Options() { + } + }; + /** * Creates a ClientActor using the specified callback. * \param[in] callback Callback for outgoing notifications from TDLib. */ - explicit ClientActor(unique_ptr callback); + explicit ClientActor(unique_ptr callback, Options options = {}); /** * Sends one request to TDLib. The answer will be received via callback. @@ -70,16 +80,18 @@ class ClientActor : public Actor { ActorOwn td_; }; +std::shared_ptr create_net_query_stats(); + /** * Dumps information about all pending network queries to the internal TDLib log. * This is useful for library debugging. */ -void dump_pending_network_queries(); +void dump_pending_network_queries(NetQueryStats &stats); /** * Returns the current number of pending network queries. Useful for library debugging. * \return Number of currently pending network queries. */ -uint64 get_pending_network_query_count(); +uint64 get_pending_network_query_count(NetQueryStats &stats); } // namespace td diff --git a/td/telegram/ContactsManager.cpp b/td/telegram/ContactsManager.cpp index 3c1f5b062..0ab72f4bc 100644 --- a/td/telegram/ContactsManager.cpp +++ b/td/telegram/ContactsManager.cpp @@ -2525,7 +2525,7 @@ class GetChannelParticipantQuery : public Td::ResultHandler { void on_error(uint64 id, Status status) override { if (status.message() == "USER_NOT_PARTICIPANT") { - promise_.set_value({user_id_, UserId(), 0, DialogParticipantStatus::Left()}); + promise_.set_value(DialogParticipant::left(user_id_)); return; } @@ -3220,6 +3220,7 @@ void ContactsManager::User::store(StorerT &storer) const { STORE_FLAG(is_contact); STORE_FLAG(is_mutual_contact); STORE_FLAG(has_restriction_reasons); + STORE_FLAG(need_apply_min_photo); END_STORE_FLAGS(); store(first_name, storer); if (has_last_name) { @@ -3288,6 +3289,7 @@ void ContactsManager::User::parse(ParserT &parser) { PARSE_FLAG(is_contact); PARSE_FLAG(is_mutual_contact); PARSE_FLAG(has_restriction_reasons); + PARSE_FLAG(need_apply_min_photo); END_PARSE_FLAGS(); parse(first_name, parser); if (has_last_name) { @@ -3750,6 +3752,8 @@ void ContactsManager::ChannelFull::store(StorerT &storer) const { STORE_FLAG(is_slow_mode_delay_active); STORE_FLAG(has_stats_dc_id); STORE_FLAG(has_photo); + STORE_FLAG(is_can_view_statistics_inited); + STORE_FLAG(can_view_statistics); END_STORE_FLAGS(); if (has_description) { store(description, storer); @@ -3841,6 +3845,8 @@ void ContactsManager::ChannelFull::parse(ParserT &parser) { PARSE_FLAG(is_slow_mode_delay_active); PARSE_FLAG(has_stats_dc_id); PARSE_FLAG(has_photo); + PARSE_FLAG(is_can_view_statistics_inited); + PARSE_FLAG(can_view_statistics); END_PARSE_FLAGS(); if (has_description) { parse(description, parser); @@ -3893,6 +3899,9 @@ void ContactsManager::ChannelFull::parse(ParserT &parser) { if (legacy_can_view_statistics) { LOG(DEBUG) << "Ignore legacy can view statistics flag"; } + if (!is_can_view_statistics_inited) { + can_view_statistics = stats_dc_id.is_exact(); + } } template @@ -5978,7 +5987,8 @@ void ContactsManager::set_channel_slow_mode_delay(DialogId dialog_id, int32 slow td_->create_handler(std::move(promise))->send(channel_id, slow_mode_delay); } -void ContactsManager::get_channel_statistics_dc_id(DialogId dialog_id, Promise &&promise) { +void ContactsManager::get_channel_statistics_dc_id(DialogId dialog_id, bool for_full_statistics, + Promise &&promise) { if (!dialog_id.is_valid()) { return promise.set_error(Status::Error(400, "Invalid chat identifier specified")); } @@ -5997,11 +6007,13 @@ void ContactsManager::get_channel_statistics_dc_id(DialogId dialog_id, Promisestats_dc_id.is_exact()) { - auto query_promise = PromiseCreator::lambda( - [actor_id = actor_id(this), channel_id, promise = std::move(promise)](Result result) mutable { - send_closure(actor_id, &ContactsManager::get_channel_statistics_dc_id_impl, channel_id, std::move(promise)); - }); + if (channel_full == nullptr || !channel_full->stats_dc_id.is_exact() || + (for_full_statistics && !channel_full->can_view_statistics)) { + auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), channel_id, for_full_statistics, + promise = std::move(promise)](Result result) mutable { + send_closure(actor_id, &ContactsManager::get_channel_statistics_dc_id_impl, channel_id, for_full_statistics, + std::move(promise)); + }); send_get_channel_full_query(channel_full, channel_id, std::move(query_promise), "get_channel_statistics_dc_id"); return; } @@ -6009,7 +6021,8 @@ void ContactsManager::get_channel_statistics_dc_id(DialogId dialog_id, Promisestats_dc_id)); } -void ContactsManager::get_channel_statistics_dc_id_impl(ChannelId channel_id, Promise &&promise) { +void ContactsManager::get_channel_statistics_dc_id_impl(ChannelId channel_id, bool for_full_statistics, + Promise &&promise) { if (G()->close_flag()) { return promise.set_error(Status::Error(500, "Request aborted")); } @@ -6019,7 +6032,7 @@ void ContactsManager::get_channel_statistics_dc_id_impl(ChannelId channel_id, Pr return promise.set_error(Status::Error(400, "Chat full info not found")); } - if (!channel_full->stats_dc_id.is_exact()) { + if (!channel_full->stats_dc_id.is_exact() || (for_full_statistics && !channel_full->can_view_statistics)) { return promise.set_error(Status::Error(400, "Chat statistics is not available")); } @@ -6036,7 +6049,7 @@ void ContactsManager::get_channel_statistics(DialogId dialog_id, bool is_dark, send_closure(actor_id, &ContactsManager::send_get_channel_stats_query, r_dc_id.move_as_ok(), dialog_id.get_channel_id(), is_dark, std::move(promise)); }); - get_channel_statistics_dc_id(dialog_id, std::move(dc_id_promise)); + get_channel_statistics_dc_id(dialog_id, true, std::move(dc_id_promise)); } void ContactsManager::send_get_channel_stats_query(DcId dc_id, ChannelId channel_id, bool is_dark, @@ -6063,7 +6076,7 @@ void ContactsManager::load_statistics_graph(DialogId dialog_id, const string &to send_closure(actor_id, &ContactsManager::send_load_async_graph_query, r_dc_id.move_as_ok(), std::move(token), x, std::move(promise)); }); - get_channel_statistics_dc_id(dialog_id, std::move(dc_id_promise)); + get_channel_statistics_dc_id(dialog_id, false, std::move(dc_id_promise)); } void ContactsManager::send_load_async_graph_query(DcId dc_id, string token, int64 x, @@ -7215,7 +7228,9 @@ void ContactsManager::on_get_user(tl_object_ptr &&user_ptr, if (is_received || !user->phone_.empty()) { on_update_user_phone_number(u, user_id, std::move(user->phone_)); } - on_update_user_photo(u, user_id, std::move(user->photo_), source); + if (is_received || u->need_apply_min_photo) { + on_update_user_photo(u, user_id, std::move(user->photo_), source); + } if (is_received) { on_update_user_online(u, user_id, std::move(user->status_)); @@ -7240,6 +7255,7 @@ void ContactsManager::on_get_user(tl_object_ptr &&user_ptr, string inline_query_placeholder = user->bot_inline_placeholder_; bool need_location_bot = (flags & USER_FLAG_NEED_LOCATION_BOT) != 0; bool has_bot_info_version = (flags & USER_FLAG_HAS_BOT_INFO_VERSION) != 0; + bool need_apply_min_photo = (flags & USER_FLAG_NEED_APPLY_MIN_PHOTO) != 0; LOG_IF(ERROR, !is_support && expect_support) << "Receive non-support " << user_id << ", but expected a support user"; LOG_IF(ERROR, !can_join_groups && !is_bot) @@ -7261,6 +7277,7 @@ void ContactsManager::on_get_user(tl_object_ptr &&user_ptr, inline_query_placeholder = string(); need_location_bot = false; has_bot_info_version = false; + need_apply_min_photo = false; } LOG_IF(ERROR, has_bot_info_version && !is_bot) @@ -7294,6 +7311,10 @@ void ContactsManager::on_get_user(tl_object_ptr &&user_ptr, LOG(DEBUG) << "Bot info version has changed for " << user_id; u->need_save_to_database = true; } + if (u->need_apply_min_photo != need_apply_min_photo) { + u->need_apply_min_photo = need_apply_min_photo; + u->need_save_to_database = true; + } if (is_received && !u->is_received) { u->is_received = true; @@ -7561,7 +7582,7 @@ ContactsManager::User *ContactsManager::get_user_force(UserId user_id) { if (user_id == UserId(777000) && (u == nullptr || !u->is_received)) { int32 flags = telegram_api::user::ACCESS_HASH_MASK | telegram_api::user::FIRST_NAME_MASK | telegram_api::user::PHONE_MASK | telegram_api::user::PHOTO_MASK | telegram_api::user::VERIFIED_MASK | - telegram_api::user::SUPPORT_MASK; + telegram_api::user::SUPPORT_MASK | telegram_api::user::APPLY_MIN_PHOTO_MASK; auto profile_photo = telegram_api::make_object( 0, false /*ignored*/, 3337190045231023, telegram_api::make_object(107738948, 13226), @@ -7574,8 +7595,8 @@ ContactsManager::User *ContactsManager::get_user_force(UserId user_id) { auto user = telegram_api::make_object( flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, - false /*ignored*/, false /*ignored*/, false /*ignored*/, 777000, 1, "Telegram", string(), string(), "42777", - std::move(profile_photo), nullptr, 0, Auto(), string(), string()); + false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, 777000, 1, "Telegram", string(), + string(), "42777", std::move(profile_photo), nullptr, 0, Auto(), string(), string()); on_get_user(std::move(user), "get_user_force"); u = get_user(user_id); CHECK(u != nullptr && u->is_received); @@ -9471,15 +9492,20 @@ void ContactsManager::on_get_chat_full(tl_object_ptr &&c auto can_set_sticker_set = (channel_full->flags_ & CHANNEL_FULL_FLAG_CAN_SET_STICKER_SET) != 0; auto can_set_location = (channel_full->flags_ & CHANNEL_FULL_FLAG_CAN_SET_LOCATION) != 0; auto is_all_history_available = (channel_full->flags_ & CHANNEL_FULL_FLAG_IS_ALL_HISTORY_HIDDEN) == 0; + auto can_view_statistics = (channel_full->flags_ & CHANNEL_FULL_FLAG_CAN_VIEW_STATISTICS) != 0; StickerSetId sticker_set_id; if (channel_full->stickerset_ != nullptr) { sticker_set_id = td_->stickers_manager_->on_get_sticker_set(std::move(channel_full->stickerset_), true, "on_get_channel_full"); } DcId stats_dc_id; - if ((channel_full->flags_ & CHANNEL_FULL_FLAG_CAN_VIEW_STATISTICS) != 0) { + if ((channel_full->flags_ & CHANNEL_FULL_FLAG_HAS_STATISTICS_DC_ID) != 0) { stats_dc_id = DcId::create(channel_full->stats_dc_); } + if (!stats_dc_id.is_exact() && can_view_statistics) { + LOG(ERROR) << "Receive can_view_statistics == true, but invalid statistics DC ID in " << channel_id; + can_view_statistics = false; + } ChannelFull *channel = add_channel_full(channel_id); channel->repair_request_version = 0; @@ -9488,8 +9514,9 @@ void ContactsManager::on_get_chat_full(tl_object_ptr &&c channel->administrator_count != administrator_count || channel->restricted_count != restricted_count || channel->banned_count != banned_count || channel->can_get_participants != can_get_participants || channel->can_set_username != can_set_username || channel->can_set_sticker_set != can_set_sticker_set || - channel->can_set_location != can_set_location || channel->stats_dc_id != stats_dc_id || - channel->sticker_set_id != sticker_set_id || channel->is_all_history_available != is_all_history_available) { + channel->can_set_location != can_set_location || channel->can_view_statistics != can_view_statistics || + channel->stats_dc_id != stats_dc_id || channel->sticker_set_id != sticker_set_id || + channel->is_all_history_available != is_all_history_available) { channel->description = std::move(channel_full->about_); channel->participant_count = participant_count; channel->administrator_count = administrator_count; @@ -9499,6 +9526,7 @@ void ContactsManager::on_get_chat_full(tl_object_ptr &&c channel->can_set_username = can_set_username; channel->can_set_sticker_set = can_set_sticker_set; channel->can_set_location = can_set_location; + channel->can_view_statistics = can_view_statistics; channel->stats_dc_id = stats_dc_id; channel->is_all_history_available = is_all_history_available; channel->sticker_set_id = sticker_set_id; @@ -9511,6 +9539,10 @@ void ContactsManager::on_get_chat_full(tl_object_ptr &&c update_channel(c, channel_id); } } + if (!channel->is_can_view_statistics_inited) { + channel->is_can_view_statistics_inited = true; + channel->need_save_to_database = true; + } on_update_channel_full_photo( channel, channel_id, @@ -12105,6 +12137,43 @@ void ContactsManager::on_update_channel_default_permissions(ChannelId channel_id } } +void ContactsManager::on_update_channel_participant(ChannelId channel_id, UserId user_id, int32 date, + tl_object_ptr old_participant, + tl_object_ptr new_participant) { + if (!td_->auth_manager_->is_bot()) { + LOG(ERROR) << "Receive updateChannelParticipant by non-bot"; + return; + } + if (!channel_id.is_valid() || !user_id.is_valid() || date <= 0 || + (old_participant == nullptr && new_participant == nullptr)) { + LOG(ERROR) << "Receive invalid updateChannelParticipant in " << channel_id << " for " << user_id << " at " << date + << ": " << to_string(old_participant) << " -> " << to_string(new_participant); + return; + } + + DialogParticipant old_dialog_participant; + DialogParticipant new_dialog_participant; + if (old_participant != nullptr) { + old_dialog_participant = get_dialog_participant(channel_id, std::move(old_participant)); + if (new_participant == nullptr) { + new_dialog_participant = DialogParticipant::left(old_dialog_participant.user_id); + } else { + new_dialog_participant = get_dialog_participant(channel_id, std::move(new_participant)); + } + } else { + new_dialog_participant = get_dialog_participant(channel_id, std::move(new_participant)); + old_dialog_participant = DialogParticipant::left(new_dialog_participant.user_id); + } + if (old_dialog_participant.user_id != new_dialog_participant.user_id || !old_dialog_participant.is_valid() || + !new_dialog_participant.is_valid()) { + LOG(ERROR) << "Receive wrong updateChannelParticipant: " << old_dialog_participant << " -> " + << new_dialog_participant; + return; + } + + // TODO send update +} + void ContactsManager::update_contacts_hints(const User *u, UserId user_id, bool from_database) { bool is_contact = is_user_contact(u, user_id); if (td_->auth_manager_->is_bot()) { @@ -13165,7 +13234,7 @@ DialogParticipant ContactsManager::get_chat_participant(ChatId chat_id, UserId u auto result = get_chat_participant(chat_id, user_id); if (result == nullptr) { - return {user_id, UserId(), 0, DialogParticipantStatus::Left()}; + return DialogParticipant::left(user_id); } return *result; @@ -14165,7 +14234,7 @@ tl_object_ptr ContactsManager::get_supergroup_full_i channel_full->participant_count, channel_full->administrator_count, channel_full->restricted_count, channel_full->banned_count, DialogId(channel_full->linked_channel_id).get(), channel_full->slow_mode_delay, slow_mode_delay_expires_in, channel_full->can_get_participants, channel_full->can_set_username, - channel_full->can_set_sticker_set, channel_full->can_set_location, channel_full->stats_dc_id.is_exact(), + channel_full->can_set_sticker_set, channel_full->can_set_location, channel_full->can_view_statistics, channel_full->is_all_history_available, channel_full->sticker_set_id.get(), channel_full->location.get_chat_location_object(), channel_full->invite_link, get_basic_group_id_object_internal(channel_full->migrated_from_chat_id, "get_supergroup_full_info_object"), diff --git a/td/telegram/ContactsManager.h b/td/telegram/ContactsManager.h index 0453a7f27..6360e1cc9 100644 --- a/td/telegram/ContactsManager.h +++ b/td/telegram/ContactsManager.h @@ -203,6 +203,9 @@ class ContactsManager : public Actor { void on_update_channel_is_all_history_available(ChannelId channel_id, bool is_all_history_available); void on_update_channel_default_permissions(ChannelId channel_id, RestrictedRights default_permissions); void on_update_channel_administrator_count(ChannelId channel_id, int32 administrator_count); + void on_update_channel_participant(ChannelId channel_id, UserId user_id, int32 date, + tl_object_ptr old_participant, + tl_object_ptr new_participant); int32 on_update_peer_located(vector> &&peers, bool from_update); @@ -606,7 +609,7 @@ class ContactsManager : public Actor { std::unordered_map online_member_dialogs; // id -> time - static constexpr uint32 CACHE_VERSION = 2; + static constexpr uint32 CACHE_VERSION = 3; uint32 cache_version = 0; bool is_min_access_hash = true; @@ -622,6 +625,7 @@ class ContactsManager : public Actor { bool is_scam = false; bool is_contact = false; bool is_mutual_contact = false; + bool need_apply_min_photo = false; bool is_photo_inited = false; @@ -860,6 +864,8 @@ class ContactsManager : public Actor { bool can_set_username = false; bool can_set_sticker_set = false; bool can_set_location = false; + bool can_view_statistics = false; + bool is_can_view_statistics_inited = false; bool is_all_history_available = true; bool is_slow_mode_next_send_date_changed = true; @@ -974,6 +980,7 @@ class ContactsManager : public Actor { static constexpr int32 USER_FLAG_HAS_LANGUAGE_CODE = 1 << 22; static constexpr int32 USER_FLAG_IS_SUPPORT = 1 << 23; static constexpr int32 USER_FLAG_IS_SCAM = 1 << 24; + static constexpr int32 USER_FLAG_NEED_APPLY_MIN_PHOTO = 1 << 25; static constexpr int32 USER_FULL_FLAG_IS_BLOCKED = 1 << 0; static constexpr int32 USER_FULL_FLAG_HAS_ABOUT = 1 << 1; @@ -1028,7 +1035,7 @@ class ContactsManager : public Actor { static constexpr int32 CHANNEL_FULL_FLAG_HAS_AVAILABLE_MIN_MESSAGE_ID = 1 << 9; static constexpr int32 CHANNEL_FULL_FLAG_IS_ALL_HISTORY_HIDDEN = 1 << 10; static constexpr int32 CHANNEL_FULL_FLAG_HAS_FOLDER_ID = 1 << 11; - static constexpr int32 CHANNEL_FULL_FLAG_CAN_VIEW_STATISTICS = 1 << 12; + static constexpr int32 CHANNEL_FULL_FLAG_HAS_STATISTICS_DC_ID = 1 << 12; static constexpr int32 CHANNEL_FULL_FLAG_HAS_ONLINE_MEMBER_COUNT = 1 << 13; static constexpr int32 CHANNEL_FULL_FLAG_HAS_LINKED_CHANNEL_ID = 1 << 14; static constexpr int32 CHANNEL_FULL_FLAG_HAS_LOCATION = 1 << 15; @@ -1036,6 +1043,7 @@ class ContactsManager : public Actor { static constexpr int32 CHANNEL_FULL_FLAG_HAS_SLOW_MODE_DELAY = 1 << 17; static constexpr int32 CHANNEL_FULL_FLAG_HAS_SLOW_MODE_NEXT_SEND_DATE = 1 << 18; static constexpr int32 CHANNEL_FULL_FLAG_HAS_SCHEDULED_MESSAGES = 1 << 19; + static constexpr int32 CHANNEL_FULL_FLAG_CAN_VIEW_STATISTICS = 1 << 20; static constexpr int32 CHAT_INVITE_FLAG_IS_CHANNEL = 1 << 0; static constexpr int32 CHAT_INVITE_FLAG_IS_BROADCAST = 1 << 1; @@ -1428,9 +1436,9 @@ class ContactsManager : public Actor { tl_object_ptr input_check_password, Promise &&promise); - void get_channel_statistics_dc_id(DialogId dialog_id, Promise &&promise); + void get_channel_statistics_dc_id(DialogId dialog_id, bool for_full_statistics, Promise &&promise); - void get_channel_statistics_dc_id_impl(ChannelId channel_id, Promise &&promise); + void get_channel_statistics_dc_id_impl(ChannelId channel_id, bool for_full_statistics, Promise &&promise); void send_get_channel_stats_query(DcId dc_id, ChannelId channel_id, bool is_dark, Promise> &&promise); diff --git a/td/telegram/DialogParticipant.h b/td/telegram/DialogParticipant.h index 4bd30702a..981cf447c 100644 --- a/td/telegram/DialogParticipant.h +++ b/td/telegram/DialogParticipant.h @@ -370,6 +370,10 @@ struct DialogParticipant { DialogParticipant(tl_object_ptr &&participant_ptr, DialogParticipantStatus my_status); + static DialogParticipant left(UserId user_id) { + return {user_id, UserId(), 0, DialogParticipantStatus::Left()}; + } + bool is_valid() const; template diff --git a/td/telegram/Global.cpp b/td/telegram/Global.cpp index 1901ae09d..eb4c0c69d 100644 --- a/td/telegram/Global.cpp +++ b/td/telegram/Global.cpp @@ -219,6 +219,10 @@ bool Global::ignore_backgrond_updates() const { shared_config_->get_option_boolean("ignore_background_updates"); } +void Global::set_net_query_stats(std::shared_ptr net_query_stats) { + net_query_creator_.set_create_func([=] { return td::make_unique(net_query_stats); }); +} + void Global::set_net_query_dispatcher(unique_ptr net_query_dispatcher) { net_query_dispatcher_ = std::move(net_query_dispatcher); } diff --git a/td/telegram/Global.h b/td/telegram/Global.h index 3b9cfc40a..e5dcac3f0 100644 --- a/td/telegram/Global.h +++ b/td/telegram/Global.h @@ -102,9 +102,10 @@ class Global : public ActorContext { bool ignore_backgrond_updates() const; NetQueryCreator &net_query_creator() { - return net_query_creator_.get(); + return *net_query_creator_.get(); } + void set_net_query_stats(std::shared_ptr net_query_stats); void set_net_query_dispatcher(unique_ptr net_query_dispatcher); NetQueryDispatcher &net_query_dispatcher() { @@ -422,7 +423,7 @@ class Global : public ActorContext { ActorId state_manager_; - SchedulerLocalStorage net_query_creator_; + LazySchedulerLocalStorage> net_query_creator_; unique_ptr net_query_dispatcher_; unique_ptr shared_config_; diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 665cc9937..c094238b7 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -12448,14 +12448,14 @@ int64 MessagesManager::get_dialog_pinned_order(const DialogList *list, DialogId return DEFAULT_ORDER; } -void MessagesManager::set_dialog_is_pinned(DialogId dialog_id, bool is_pinned) { +bool MessagesManager::set_dialog_is_pinned(DialogId dialog_id, bool is_pinned) { if (td_->auth_manager_->is_bot()) { - return; + return false; } Dialog *d = get_dialog(dialog_id); CHECK(d != nullptr); - set_dialog_is_pinned(DialogListId(d->folder_id), d, is_pinned); + return set_dialog_is_pinned(DialogListId(d->folder_id), d, is_pinned); } bool MessagesManager::set_dialog_is_pinned(DialogListId dialog_list_id, Dialog *d, bool is_pinned, @@ -13099,6 +13099,7 @@ void MessagesManager::on_get_dialogs(FolderId folder_id, vectorare_pinned_dialogs_inited_; folder_list->are_pinned_dialogs_inited_ = true; if (pinned_dialog_ids != added_dialog_ids) { LOG(INFO) << "Update pinned chats order from " << format::as_array(pinned_dialog_ids) << " to " @@ -13122,15 +13123,24 @@ void MessagesManager::on_get_dialogs(FolderId folder_id, vectorparameters().use_message_db) { + LOG(INFO) << "Save empty pinned chat list in " << folder_id; + G()->td_db()->get_binlog_pmc()->set(PSTRING() << "pinned_dialog_ids" << folder_id.get(), ""); + } } promise.set_value(Unit()); diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index a1d308540..717285dcd 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -2137,7 +2137,7 @@ class MessagesManager : public Actor { static vector remove_secret_chat_dialog_ids(vector dialog_ids); - void set_dialog_is_pinned(DialogId dialog_id, bool is_pinned); + bool set_dialog_is_pinned(DialogId dialog_id, bool is_pinned); bool set_dialog_is_pinned(DialogListId dialog_list_id, Dialog *d, bool is_pinned, bool need_update_dialog_lists = true); diff --git a/td/telegram/NotificationManager.cpp b/td/telegram/NotificationManager.cpp index 5d65ccd9f..efe7023a6 100644 --- a/td/telegram/NotificationManager.cpp +++ b/td/telegram/NotificationManager.cpp @@ -3256,8 +3256,9 @@ Status NotificationManager::process_push_notification_payload(string payload, bo auto user = telegram_api::make_object( flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, - false /*ignored*/, false /*ignored*/, false /*ignored*/, sender_user_id.get(), sender_access_hash, user_name, - string(), string(), string(), std::move(sender_photo), nullptr, 0, Auto(), string(), string()); + false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, sender_user_id.get(), + sender_access_hash, user_name, string(), string(), string(), std::move(sender_photo), nullptr, 0, Auto(), + string(), string()); td_->contacts_manager_->on_get_user(std::move(user), "process_push_notification_payload"); } @@ -3577,8 +3578,8 @@ void NotificationManager::add_message_push_notification( auto user = telegram_api::make_object( flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, - false /*ignored*/, false /*ignored*/, false /*ignored*/, sender_user_id.get(), 0, user_name, string(), string(), - string(), nullptr, nullptr, 0, Auto(), string(), string()); + false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, sender_user_id.get(), 0, user_name, + string(), string(), string(), nullptr, nullptr, 0, Auto(), string(), string()); td_->contacts_manager_->on_get_user(std::move(user), "add_message_push_notification"); } diff --git a/td/telegram/PtsManager.h b/td/telegram/PtsManager.h index 3adbb259f..4d96b361f 100644 --- a/td/telegram/PtsManager.h +++ b/td/telegram/PtsManager.h @@ -26,8 +26,7 @@ class PtsManager { // 0 if not a checkpoint PtsId add_pts(int32 pts) { - CHECK(pts >= 0); - if (pts != 0) { + if (pts > 0) { mem_pts_ = pts; } return state_helper_.add(pts); diff --git a/td/telegram/SecretChatActor.cpp b/td/telegram/SecretChatActor.cpp index c7c20aaa7..55f07ef8f 100644 --- a/td/telegram/SecretChatActor.cpp +++ b/td/telegram/SecretChatActor.cpp @@ -153,7 +153,7 @@ void SecretChatActor::replay_create_chat(unique_ptr void SecretChatActor::add_inbound_message(unique_ptr message) { SCOPE_EXIT { if (message) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); } }; if (close_flag_) { @@ -877,7 +877,7 @@ Result> SecretChatActor::decrypt(BufferSl Status SecretChatActor::do_inbound_message_encrypted(unique_ptr message) { SCOPE_EXIT { if (message) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); } }; TRY_RESULT(decrypted, decrypt(message->encrypted_message)); @@ -919,7 +919,7 @@ Status SecretChatActor::do_inbound_message_encrypted(unique_ptr(MY_LAYER), SendFlag::None, Promise<>()); @@ -969,13 +969,13 @@ Status SecretChatActor::check_seq_no(int in_seq_no, int out_seq_no, int32 his_la Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr message) { SCOPE_EXIT { - LOG_IF(FATAL, message && message->qts_ack) << "Lost qts_promise"; + CHECK(message == nullptr || !message->promise); }; auto in_seq_no = message->decrypted_message_layer->in_seq_no_; auto out_seq_no = message->decrypted_message_layer->out_seq_no_; auto status = check_seq_no(in_seq_no, out_seq_no, message->his_layer()); if (status.is_error() && status.code() != 2 /* not gap found */) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); if (message->logevent_id()) { LOG(INFO) << "Erase binlog event: " << tag("logevent_id", message->logevent_id()); binlog_erase(context_->binlog(), message->logevent_id()); @@ -1010,14 +1010,14 @@ Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr(action_resend->start_seq_no_ / 2); uint32 finish_seq_no = static_cast(action_resend->end_seq_no_ / 2); if (start_seq_no + MAX_RESEND_COUNT < finish_seq_no) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); return Status::Error(PSLICE() << "Won't resend more than " << MAX_RESEND_COUNT << " messages"); } LOG(INFO) << "ActionResend: " << tag("start", start_seq_no) << tag("finish_seq_no", finish_seq_no); for (auto seq_no = start_seq_no; seq_no <= finish_seq_no; seq_no++) { auto it = out_seq_no_to_outbound_message_state_token_.find(seq_no); if (it == out_seq_no_to_outbound_message_state_token_.end()) { - message->qts_ack.set_value(Unit()); + message->promise.set_value(Unit()); return Status::Error(PSLICE() << "Can't resend query " << tag("seq_no", seq_no)); } auto state_id = it->second; @@ -1198,7 +1198,7 @@ void SecretChatActor::do_inbound_message_decrypted_pending(unique_ptrlogevent_id(); // qts - auto qts_promise = std::move(message->qts_ack); + auto qts_promise = std::move(message->promise); if (logevent_id == 0) { message->is_pending = true; @@ -1276,7 +1276,7 @@ Status SecretChatActor::do_inbound_message_decrypted(unique_ptrqts_ack); + auto qts_promise = std::move(message->promise); // process message tl_object_ptr file; diff --git a/td/telegram/SecretChatsManager.cpp b/td/telegram/SecretChatsManager.cpp index 67b1ac642..5925b5335 100644 --- a/td/telegram/SecretChatsManager.cpp +++ b/td/telegram/SecretChatsManager.cpp @@ -43,18 +43,6 @@ namespace td { -// qts and seq_no -// Each EncryptedMessage (update_message) has qts. -// Such updates must be handled in order of qts -// -// Qts should be handled on level of SecretChatsManager -// 1. Each update can be received by SecretChatsManager multiple times. -// 2. Each update should be sent to SecretChatActor only once. (Though SecretChatActor mustn't rely it) -// 3. Updates must be send in order of qts, without gaps. -// 4. SecretChatActor must notify SecretChatManager when update is processed (saved in database) -// 5. Only after all updates <= qts are processed by SecretChatActor, UpdatesManager should be -// notified about new qts. -// // seq_no // 1. // x_in = 0 if we initiated secret chat. @@ -94,12 +82,6 @@ void SecretChatsManager::start_up() { dummy_mode_ = true; return; } - // TODO: use database wrapper - auto pmc = G()->td_db()->get_binlog_pmc(); - auto qts_str = pmc->get("updates.qts"); - if (!qts_str.empty()) { - init_qts(to_integer(qts_str)); - } class StateCallback : public StateManager::Callback { public: @@ -116,25 +98,6 @@ void SecretChatsManager::start_up() { send_closure(G()->state_manager(), &StateManager::add_callback, make_unique(actor_id(this))); } -void SecretChatsManager::init_qts(int qts) { - if (dummy_mode_ || close_flag_) { - return; - } - has_qts_ = true; - qts_manager_.init(qts); - LOG(INFO) << "Init secret chats qts " << tag("qts", qts); -} - -void SecretChatsManager::update_qts(int qts) { - if (dummy_mode_ || close_flag_ || qts < 0) { - return; - } - LOG(INFO) << "Update qts to " << qts; - add_qts(qts).set_value(Unit()); - has_qts_ = true; - LOG(INFO) << "Update secret chats qts " << tag("qts", qts); -} - void SecretChatsManager::create_chat(int32 user_id, int64 user_access_hash, Promise promise) { int32 random_id; ActorId actor; @@ -202,14 +165,6 @@ void SecretChatsManager::send_set_ttl_message(SecretChatId secret_chat_id, int32 send_closure(actor, &SecretChatActor::send_set_ttl_message, ttl, random_id, std::move(safe_promise)); } -void SecretChatsManager::before_get_difference(int32 qts) { - if (dummy_mode_ || close_flag_) { - return; - } - last_get_difference_qts_ = qts; - // We will receive all updates later than qts anyway. -} - void SecretChatsManager::on_update_chat(tl_object_ptr update) { if (dummy_mode_ || close_flag_) { return; @@ -228,47 +183,22 @@ void SecretChatsManager::do_update_chat(tl_object_ptrchat_)); } -void SecretChatsManager::on_update_message(tl_object_ptr update, - bool force_apply) { +void SecretChatsManager::on_new_message(tl_object_ptr &&message_ptr, + Promise &&promise) { if (dummy_mode_ || close_flag_) { return; } - // UpdatesManager MUST postpone updates during GetDifference - auto qts = update->qts_; - if (!force_apply) { - if (!has_qts_) { - LOG(INFO) << "Got update, don't know current qts. Force get_difference"; - force_get_difference(); - return; - } - if (qts <= last_get_difference_qts_) { - LOG(WARNING) << "Got updates with " << tag("qts", qts) << " lower or equal than " - << tag("last get difference qts", last_get_difference_qts_); - force_get_difference(); - return; - } - auto mem_qts = qts_manager_.mem_pts(); - if (qts <= mem_qts) { - LOG(WARNING) << "Duplicated update " << tag("qts", qts) << tag("mem_qts", mem_qts); - return; - } - if (qts != mem_qts + 1) { - LOG(WARNING) << "Got gap in qts " << mem_qts << " ... " << qts; - force_get_difference(); - // TODO wait 1 second? - return; - } - } + CHECK(message_ptr != nullptr); auto event = make_unique(); - event->qts = qts; - downcast_call(*update->message_, [&](auto &x) { + event->promise = std::move(promise); + downcast_call(*message_ptr, [&](auto &x) { event->chat_id = x.chat_id_; event->date = x.date_; event->encrypted_message = std::move(x.bytes_); }); - if (update->message_->get_id() == telegram_api::encryptedMessage::ID) { - auto message = move_tl_object_as(update->message_); + if (message_ptr->get_id() == telegram_api::encryptedMessage::ID) { + auto message = move_tl_object_as(message_ptr); if (message->file_->get_id() == telegram_api::encryptedFile::ID) { auto file = move_tl_object_as(message->file_); @@ -284,11 +214,6 @@ void SecretChatsManager::on_update_message(tl_object_ptr SecretChatsManager::add_qts(int32 qts) { - auto id = qts_manager_.add_pts(qts); - return PromiseCreator::event(self_closure(this, &SecretChatsManager::on_qts_ack, id)); -} - void SecretChatsManager::replay_binlog_event(BinlogEvent &&binlog_event) { if (dummy_mode_) { binlog_erase(G()->td_db()->get_binlog(), binlog_event.id_); @@ -324,14 +249,13 @@ void SecretChatsManager::binlog_replay_finish() { } void SecretChatsManager::replay_inbound_message(unique_ptr message) { - LOG(INFO) << "Replay inbound secret message in chat " << message->chat_id << " with qts " << message->qts; + LOG(INFO) << "Replay inbound secret message in chat " << message->chat_id; auto actor = get_chat_actor(message->chat_id); send_closure_later(actor, &SecretChatActor::replay_inbound_message, std::move(message)); } void SecretChatsManager::add_inbound_message(unique_ptr message) { - LOG(INFO) << "Process inbound secret message in chat " << message->chat_id << " with qts " << message->qts; - message->qts_ack = add_qts(message->qts); + LOG(INFO) << "Process inbound secret message in chat " << message->chat_id; auto actor = get_chat_actor(message->chat_id); send_closure(actor, &SecretChatActor::add_inbound_message, std::move(message)); @@ -358,11 +282,6 @@ void SecretChatsManager::replay_outbound_message(unique_ptrtd(), &Td::force_get_difference); -} - ActorId SecretChatsManager::get_chat_actor(int32 id) { return create_chat_actor_impl(id, false); } @@ -502,19 +421,6 @@ ActorId SecretChatsManager::create_chat_actor_impl(int32 id, bo } } -void SecretChatsManager::on_qts_ack(PtsManager::PtsId qts_ack_token) { - auto old_qts = qts_manager_.db_pts(); - auto new_qts = qts_manager_.finish(qts_ack_token); - if (old_qts != new_qts) { - save_qts(); - } -} - -void SecretChatsManager::save_qts() { - LOG(INFO) << "Save " << tag("qts", qts_manager_.db_pts()); - send_closure(G()->td(), &Td::update_qts, qts_manager_.db_pts()); -} - void SecretChatsManager::hangup() { close_flag_ = true; if (dummy_mode_) { diff --git a/td/telegram/SecretChatsManager.h b/td/telegram/SecretChatsManager.h index 904cdde1b..2e052fadb 100644 --- a/td/telegram/SecretChatsManager.h +++ b/td/telegram/SecretChatsManager.h @@ -29,16 +29,11 @@ struct BinlogEvent; class SecretChatsManager : public Actor { public: explicit SecretChatsManager(ActorShared<> parent); - void init_qts(int32 qts); - void update_qts(int32 qts); - // we can forget all pending_updates after start_get_difference they will be received after this point anyway - // It is not necessary, but it will help. - void before_get_difference(int32 qts); // Proxy query to corrensponding SecretChatActor. // Look for more info in SecretChatActor.h void on_update_chat(tl_object_ptr update); - void on_update_message(tl_object_ptr update, bool force_apply); + void on_new_message(tl_object_ptr &&message_ptr, Promise &&promise); void create_chat(int32 user_id, int64 user_access_hash, Promise promise); void cancel_chat(SecretChatId, Promise<> promise); @@ -60,13 +55,9 @@ class SecretChatsManager : public Actor { bool binlog_replay_finish_flag_ = false; bool dummy_mode_ = false; bool close_flag_ = false; - bool has_qts_ = false; ActorShared<> parent_; std::map> id_to_actor_; - PtsManager qts_manager_; - int32 last_get_difference_qts_ = -1; - bool is_online_{false}; std::vector>> pending_chat_updates_; @@ -83,10 +74,6 @@ class SecretChatsManager : public Actor { ActorId get_chat_actor(int32 id); ActorId create_chat_actor(int32 id); ActorId create_chat_actor_impl(int32 id, bool can_be_empty); - Promise<> add_qts(int32 qts); - void on_qts_ack(PtsManager::PtsId qts_ack_token); - void save_qts(); - void force_get_difference(); void start_up() override; void hangup() override; diff --git a/td/telegram/StickersManager.cpp b/td/telegram/StickersManager.cpp index 040f34098..1eadab106 100644 --- a/td/telegram/StickersManager.cpp +++ b/td/telegram/StickersManager.cpp @@ -2324,8 +2324,8 @@ StickerSetId StickersManager::on_get_messages_sticker_set(StickerSetId sticker_s CHECK(s != nullptr); CHECK(s->is_inited); - s->expires_at = G()->unix_time() + (td_->auth_manager_->is_bot() ? Random::fast(10 * 60, 15 * 60) - : Random::fast(20 * 60 * 60, 28 * 60 * 60)); + s->expires_at = G()->unix_time() + + (td_->auth_manager_->is_bot() ? Random::fast(10 * 60, 15 * 60) : Random::fast(30 * 60, 50 * 60)); if (s->is_loaded) { update_sticker_set(s); diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index 5a2385d62..6125c4a44 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -125,7 +125,6 @@ #include "td/utils/Status.h" #include "td/utils/Timer.h" #include "td/utils/tl_parsers.h" -#include "td/utils/TsList.h" #include "td/utils/utf8.h" #include @@ -3005,7 +3004,8 @@ class SetBackgroundRequest : public RequestActor<> { } }; -Td::Td(unique_ptr callback) : callback_(std::move(callback)) { +Td::Td(unique_ptr callback, Options options) + : callback_(std::move(callback)), td_options_(std::move(options)) { } Td::~Td() = default; @@ -3520,22 +3520,6 @@ void Td::send(NetQueryPtr &&query) { G()->net_query_dispatcher().dispatch(std::move(query)); } -void Td::update_qts(int32 qts) { - if (close_flag_ > 1) { - return; - } - - updates_manager_->set_qts(qts); -} - -void Td::force_get_difference() { - if (close_flag_) { - return; - } - - updates_manager_->get_difference("force_get_difference"); -} - void Td::on_result(NetQueryPtr query) { query->debug("Td: received from DcManager"); VLOG(net_query) << "Receive result of " << query; @@ -3736,10 +3720,9 @@ void Td::start_up() { LOG_IF(FATAL, symbol != c) << "TDLib requires little-endian platform"; } - TsList::lock().unlock(); // initialize mutex before any NetQuery - VLOG(td_init) << "Create Global"; set_context(std::make_shared()); + G()->set_net_query_stats(td_options_.net_query_stats); inc_request_actor_refcnt(); // guard inc_actor_refcnt(); // guard @@ -3781,6 +3764,7 @@ ActorShared Td::create_reference() { inc_actor_refcnt(); return actor_shared(this, ActorIdType); } + void Td::inc_actor_refcnt() { actor_refcnt_++; } diff --git a/td/telegram/Td.h b/td/telegram/Td.h index 63b13d62f..11162a994 100644 --- a/td/telegram/Td.h +++ b/td/telegram/Td.h @@ -9,6 +9,7 @@ #include "td/telegram/files/FileId.h" #include "td/telegram/net/MtprotoHeader.h" #include "td/telegram/net/NetQuery.h" +#include "td/telegram/net/NetQueryStats.h" #include "td/telegram/StateManager.h" #include "td/telegram/TdCallback.h" #include "td/telegram/TdParameters.h" @@ -94,16 +95,15 @@ class Td final : public NetQueryCallback { Td &operator=(Td &&) = delete; ~Td() override; - explicit Td(unique_ptr callback); + struct Options { + std::shared_ptr net_query_stats; + }; + + Td(unique_ptr callback, Options options); void request(uint64 id, tl_object_ptr function); void destroy(); - void close(); - - void update_qts(int32 qts); - - void force_get_difference(); void schedule_get_terms_of_service(int32 expires_in); @@ -226,8 +226,6 @@ class Td final : public NetQueryCallback { void send_update(tl_object_ptr &&object); - ActorShared create_reference(); - static td_api::object_ptr static_request(td_api::object_ptr function); private: @@ -246,12 +244,15 @@ class Td final : public NetQueryCallback { void send_error_raw(uint64 id, int32 code, CSlice error); void answer_ok_query(uint64 id, Status status); + ActorShared create_reference(); + void inc_actor_refcnt(); void dec_actor_refcnt(); void inc_request_actor_refcnt(); void dec_request_actor_refcnt(); + void close(); void on_closed(); void dec_stop_cnt(); @@ -261,6 +262,7 @@ class Td final : public NetQueryCallback { TdParameters parameters_; unique_ptr callback_; + Options td_options_; StateManager::State connection_state_; diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index 9066be3da..8eb33dc54 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -182,6 +182,10 @@ void UpdatesManager::fill_seq_gap(void *td) { fill_gap(td, "seq"); } +void UpdatesManager::fill_qts_gap(void *td) { + fill_gap(td, "qts"); +} + void UpdatesManager::fill_get_difference_gap(void *td) { fill_gap(td, "getDifference"); } @@ -227,9 +231,7 @@ void UpdatesManager::before_get_difference(bool is_initial) { send_closure(G()->state_manager(), &StateManager::on_synchronized, false); td_->messages_manager_->before_get_difference(); - if (!is_initial) { - send_closure(td_->secret_chats_manager_, &SecretChatsManager::before_get_difference, get_qts()); - } + send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference); } @@ -238,6 +240,11 @@ Promise<> UpdatesManager::add_pts(int32 pts) { return PromiseCreator::event(self_closure(this, &UpdatesManager::on_pts_ack, id)); } +Promise<> UpdatesManager::add_qts(int32 qts) { + auto id = qts_manager_.add_pts(qts); + return PromiseCreator::event(self_closure(this, &UpdatesManager::on_qts_ack, id)); +} + void UpdatesManager::on_pts_ack(PtsManager::PtsId ack_token) { auto old_pts = pts_manager_.db_pts(); auto new_pts = pts_manager_.finish(ack_token); @@ -246,6 +253,14 @@ void UpdatesManager::on_pts_ack(PtsManager::PtsId ack_token) { } } +void UpdatesManager::on_qts_ack(PtsManager::PtsId ack_token) { + auto old_qts = qts_manager_.db_pts(); + auto new_qts = qts_manager_.finish(ack_token); + if (old_qts != new_qts) { + save_qts(new_qts); + } +} + void UpdatesManager::save_pts(int32 pts) { if (pts == std::numeric_limits::max()) { G()->td_db()->get_binlog_pmc()->erase("updates.pts"); @@ -254,6 +269,12 @@ void UpdatesManager::save_pts(int32 pts) { } } +void UpdatesManager::save_qts(int32 qts) { + if (!G()->ignore_backgrond_updates()) { + G()->td_db()->get_binlog_pmc()->set("updates.qts", to_string(qts)); + } +} + Promise<> UpdatesManager::set_pts(int32 pts, const char *source) { if (pts == std::numeric_limits::max()) { LOG(WARNING) << "Update pts from " << get_pts() << " to -1 from " << source; @@ -281,19 +302,6 @@ Promise<> UpdatesManager::set_pts(int32 pts, const char *source) { return result; } -void UpdatesManager::set_qts(int32 qts) { - if (qts > qts_) { - LOG(INFO) << "Update qts to " << qts; - - qts_ = qts; - if (!G()->ignore_backgrond_updates()) { - G()->td_db()->get_binlog_pmc()->set("updates.qts", to_string(qts)); - } - } else if (qts < qts_) { - LOG(ERROR) << "Receive wrong qts = " << qts << ". Current qts = " << qts_; - } -} - void UpdatesManager::set_date(int32 date, bool from_update, string date_source) { if (date > date_) { LOG(INFO) << "Update date to " << date; @@ -791,7 +799,7 @@ void UpdatesManager::on_get_updates_state(tl_object_ptrpts_, full_source.c_str()).set_value(Unit()); set_date(state->date_, false, std::move(full_source)); - // set_qts(state->qts_); + add_qts(state->qts_).set_value(Unit()); seq_ = state->seq_; } @@ -952,11 +960,10 @@ void UpdatesManager::init_state() { } pts_manager_.init(to_integer(pts_str)); last_get_difference_pts_ = get_pts(); - qts_ = to_integer(pmc->get("updates.qts")); + qts_manager_.init(to_integer(pmc->get("updates.qts"))); date_ = to_integer(pmc->get("updates.date")); date_source_ = "database"; - LOG(DEBUG) << "Init: " << get_pts() << " " << qts_ << " " << date_; - send_closure(td_->secret_chats_manager_, &SecretChatsManager::init_qts, qts_); + LOG(DEBUG) << "Init: " << get_pts() << " " << get_qts() << " " << date_; get_difference("init_state"); } @@ -974,7 +981,7 @@ void UpdatesManager::on_server_pong(tl_object_ptr & void UpdatesManager::process_get_difference_updates( vector> &&new_messages, - vector> &&new_encrypted_messages, int32 qts, + vector> &&new_encrypted_messages, vector> &&other_updates) { VLOG(get_difference) << "In get difference receive " << new_messages.size() << " messages, " << new_encrypted_messages.size() << " encrypted messages and " << other_updates.size() @@ -1014,9 +1021,9 @@ void UpdatesManager::process_get_difference_updates( } for (auto &encrypted_message : new_encrypted_messages) { - on_update(make_tl_object(std::move(encrypted_message), 0), true); + send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(encrypted_message), + Promise()); } - send_closure(td_->secret_chats_manager_, &SecretChatsManager::update_qts, qts); process_updates(std::move(other_updates), true); } @@ -1047,7 +1054,7 @@ void UpdatesManager::on_get_difference(tl_object_ptrcontacts_manager_->on_get_chats(std::move(difference->chats_), "updates.difference"); process_get_difference_updates(std::move(difference->new_messages_), - std::move(difference->new_encrypted_messages_), difference->state_->qts_, + std::move(difference->new_encrypted_messages_), std::move(difference->other_updates_)); if (running_get_difference_) { LOG(ERROR) << "Get difference has run while processing get difference updates"; @@ -1060,7 +1067,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr(difference_ptr); if (difference->intermediate_state_->pts_ >= get_pts() && get_pts() != std::numeric_limits::max() && - difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == qts_) { + difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == get_qts()) { // TODO send new getDifference request and apply difference slice only after that } @@ -1071,7 +1078,7 @@ void UpdatesManager::on_get_difference(tl_object_ptrnew_messages_), std::move(difference->new_encrypted_messages_), - difference->intermediate_state_->qts_, std::move(difference->other_updates_)); + std::move(difference->other_updates_)); if (running_get_difference_) { LOG(ERROR) << "Get difference has run while processing get difference updates"; break; @@ -1104,8 +1111,10 @@ void UpdatesManager::after_get_difference() { retry_timeout_.cancel_timeout(); retry_time_ = 1; - process_pending_seq_updates(); // cancels seq_gap_timeout_, may apply some updates coming before getDifference, but - // not returned in getDifference + process_pending_qts_updates(); + + process_pending_seq_updates(); // cancels seq_gap_timeout_, may apply some updates received before getDifference, + // but not returned in getDifference if (running_get_difference_) { return; } @@ -1279,6 +1288,11 @@ void UpdatesManager::on_pending_updates(vector(update), false); + processed_updates++; + update = nullptr; + } CHECK(!running_get_difference_); } } @@ -1289,7 +1303,8 @@ void UpdatesManager::on_pending_updates(vector &&update, int32 qts) { + CHECK(update != nullptr); + if (qts <= 1) { + LOG(ERROR) << "Receive wrong qts " << qts << " in " << oneline(to_string(update)); + return; + } + + int32 old_qts = get_qts(); + LOG(INFO) << "Process update with qts = " << qts << ", current qts = " << old_qts; + if (qts < old_qts - 1000001) { + LOG(WARNING) << "Restore qts after qts overflow from " << old_qts << " to " << qts << " by " + << oneline(to_string(update)); + add_qts(qts - 1).set_value(Unit()); + CHECK(get_qts() == qts - 1); + old_qts = qts - 1; + } + + if (qts <= old_qts) { + LOG(INFO) << "Skip already applied update with qts = " << qts; + return; + } + + CHECK(!running_get_difference_); + + if (qts > old_qts + 1) { + LOG(INFO) << "Postpone update with qts = " << qts; + if (pending_qts_updates_.empty()) { + set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME); + } + bool is_inserted = pending_qts_updates_.emplace(qts, std::move(update)).second; + if (!is_inserted) { + LOG(INFO) << "Receive duplicate update with qts = " << qts; + } + return; + } + + process_qts_update(std::move(update), qts); + process_pending_qts_updates(); +} + void UpdatesManager::process_updates(vector> &&updates, bool force_apply) { tl_object_ptr update_pts_changed; /* @@ -1409,6 +1464,28 @@ void UpdatesManager::process_seq_updates(int32 seq_end, int32 date, } } +void UpdatesManager::process_qts_update(tl_object_ptr &&update_ptr, int32 qts) { + LOG(DEBUG) << "Process " << to_string(update_ptr); + switch (update_ptr->get_id()) { + case telegram_api::updateNewEncryptedMessage::ID: { + auto update = move_tl_object_as(update_ptr); + send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(update->message_), + add_qts(qts)); + break; + } + case telegram_api::updateChannelParticipant::ID: { + auto update = move_tl_object_as(update_ptr); + td_->contacts_manager_->on_update_channel_participant(ChannelId(update->channel_id_), UserId(update->user_id_), + update->date_, std::move(update->prev_participant_), + std::move(update->new_participant_)); + break; + } + default: + UNREACHABLE(); + break; + } +} + void UpdatesManager::process_pending_seq_updates() { while (!pending_seq_updates_.empty() && !running_get_difference_) { auto update_it = pending_seq_updates_.begin(); @@ -1429,6 +1506,34 @@ void UpdatesManager::process_pending_seq_updates() { } if (pending_seq_updates_.empty()) { seq_gap_timeout_.cancel_timeout(); + } else { + // if after getDifference still have a gap + set_seq_gap_timeout(MAX_UNFILLED_GAP_TIME); + } +} + +void UpdatesManager::process_pending_qts_updates() { + if (pending_qts_updates_.empty()) { + return; + } + LOG(DEBUG) << "Process " << pending_qts_updates_.size() << " pending qts updates"; + while (!pending_qts_updates_.empty()) { + CHECK(!running_get_difference_); + auto update_it = pending_qts_updates_.begin(); + auto qts = update_it->first; + if (qts > get_qts() + 1) { + break; + } + if (qts == get_qts() + 1) { + process_qts_update(std::move(update_it->second), qts); + } + pending_qts_updates_.erase(update_it); + } + if (pending_qts_updates_.empty()) { + qts_gap_timeout_.cancel_timeout(); + } else { + // if after getDifference still have a gap + set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME); } } @@ -1440,6 +1545,14 @@ void UpdatesManager::set_seq_gap_timeout(double timeout) { } } +void UpdatesManager::set_qts_gap_timeout(double timeout) { + if (!qts_gap_timeout_.has_timeout()) { + qts_gap_timeout_.set_callback(std::move(fill_qts_gap)); + qts_gap_timeout_.set_callback_data(static_cast(td_)); + qts_gap_timeout_.set_timeout_in(timeout); + } +} + void UpdatesManager::on_pending_update(tl_object_ptr update, int32 seq, const char *source) { vector> updates; updates.push_back(std::move(update)); @@ -1915,7 +2028,12 @@ void UpdatesManager::on_update(tl_object_ptr upd } void UpdatesManager::on_update(tl_object_ptr update, bool force_apply) { - send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_update_message, std::move(update), force_apply); + if (force_apply) { + return process_qts_update(std::move(update), 0); + } + + auto qts = update->qts_; + add_pending_qts_update(std::move(update), qts); } void UpdatesManager::on_update(tl_object_ptr update, bool /*force_apply*/) { @@ -2039,7 +2157,16 @@ void UpdatesManager::on_update(tl_object_ptr update, bool /*force_apply*/) { - LOG(INFO) << "Receive updateLoginToken after authorization"; + LOG(INFO) << "Ignore updateLoginToken after authorization"; +} + +void UpdatesManager::on_update(tl_object_ptr update, bool force_apply) { + if (force_apply) { + return process_qts_update(std::move(update), 0); + } + + auto qts = update->qts_; + add_pending_qts_update(std::move(update), qts); } // unsupported updates @@ -2047,7 +2174,4 @@ void UpdatesManager::on_update(tl_object_ptr upd void UpdatesManager::on_update(tl_object_ptr update, bool /*force_apply*/) { } -void UpdatesManager::on_update(tl_object_ptr update, bool /*force_apply*/) { -} - } // namespace td diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index 60dfca28d..91c50760c 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -63,7 +63,7 @@ class UpdatesManager : public Actor { return pts_manager_.mem_pts(); } int32 get_qts() const { - return qts_; + return qts_manager_.mem_pts(); } int32 get_date() const { return date_; @@ -71,8 +71,6 @@ class UpdatesManager : public Actor { Promise<> set_pts(int32 pts, const char *source) TD_WARN_UNUSED_RESULT; - void set_qts(int32 qts); - static const double MAX_UNFILLED_GAP_TIME; static void fill_pts_gap(void *td); @@ -102,7 +100,7 @@ class UpdatesManager : public Actor { ActorShared<> parent_; PtsManager pts_manager_; - int32 qts_ = 0; + PtsManager qts_manager_; int32 date_ = 0; int32 seq_ = 0; string date_source_ = "nowhere"; @@ -112,8 +110,12 @@ class UpdatesManager : public Actor { std::multimap postponed_updates_; // updates received during getDifference std::multimap pending_seq_updates_; // updates with too big seq + std::map> pending_qts_updates_; // updates with too big qts + Timeout seq_gap_timeout_; + Timeout qts_gap_timeout_; + int32 retry_time_ = 1; Timeout retry_timeout_; @@ -126,6 +128,10 @@ class UpdatesManager : public Actor { void on_pts_ack(PtsManager::PtsId ack_token); void save_pts(int32 pts); + Promise<> add_qts(int32 qts); + void on_qts_ack(PtsManager::PtsId ack_token); + void save_qts(int32 qts); + void set_date(int32 date, bool from_update, string date_source); int32 get_short_update_date() const; @@ -135,10 +141,12 @@ class UpdatesManager : public Actor { void process_get_difference_updates(vector> &&new_messages, vector> &&new_encrypted_messages, - int32 qts, vector> &&other_updates); + vector> &&other_updates); void on_pending_update(tl_object_ptr update, int32 seq, const char *source); + void add_pending_qts_update(tl_object_ptr &&update, int32 qts); + void on_pending_updates(vector> &&updates, int32 seq_begin, int32 seq_end, int32 date, const char *source); @@ -146,16 +154,24 @@ class UpdatesManager : public Actor { void process_seq_updates(int32 seq_end, int32 date, vector> &&updates); + void process_qts_update(tl_object_ptr &&update_ptr, int32 qts); + void process_pending_seq_updates(); + void process_pending_qts_updates(); + static void fill_seq_gap(void *td); + static void fill_qts_gap(void *td); + static void fill_get_difference_gap(void *td); static void fill_gap(void *td, const char *source); void set_seq_gap_timeout(double timeout); + void set_qts_gap_timeout(double timeout); + void on_failed_get_difference(); void before_get_difference(bool is_initial); @@ -260,7 +276,7 @@ class UpdatesManager : public Actor { void on_update(tl_object_ptr update, bool /*force_apply*/); void on_update(tl_object_ptr update, bool /*force_apply*/); - void on_update(tl_object_ptr update, bool /*force_apply*/); + void on_update(tl_object_ptr update, bool force_apply); void on_update(tl_object_ptr update, bool /*force_apply*/); void on_update(tl_object_ptr update, bool /*force_apply*/); @@ -293,11 +309,11 @@ class UpdatesManager : public Actor { void on_update(tl_object_ptr update, bool /*force_apply*/); + void on_update(tl_object_ptr update, bool /*force_apply*/); + // unsupported updates void on_update(tl_object_ptr update, bool /*force_apply*/); - - void on_update(tl_object_ptr update, bool /*force_apply*/); }; } // namespace td diff --git a/td/telegram/cli.cpp b/td/telegram/cli.cpp index f45ec9c3c..c0fa48ef5 100644 --- a/td/telegram/cli.cpp +++ b/td/telegram/cli.cpp @@ -50,6 +50,7 @@ #include #include #include +#include #include #include #include @@ -847,7 +848,10 @@ class CliClient final : public Actor { uint64 generation_; }; - td_client_ = create_actor(name, make_unique(this, ++generation_)); + ClientActor::Options options; + options.net_query_stats = net_query_stats_; + + td_client_ = create_actor(name, make_unique(this, ++generation_), std::move(options)); } void init_td() { @@ -4165,7 +4169,7 @@ class CliClient final : public Actor { } else if (op == "q" || op == "Quit") { quit(); } else if (op == "dnq" || op == "DumpNetQueries") { - dump_pending_network_queries(); + dump_pending_network_queries(*net_query_stats_); } else if (op == "fatal") { LOG(FATAL) << "Fatal!"; } else if (op == "unreachable") { @@ -4290,6 +4294,7 @@ class CliClient final : public Actor { ConcurrentScheduler *scheduler_{nullptr}; bool use_test_dc_ = false; + std::shared_ptr net_query_stats_ = create_net_query_stats(); ActorOwn td_client_; std::queue cmd_queue_; bool close_flag_ = false; diff --git a/td/telegram/logevent/SecretChatEvent.h b/td/telegram/logevent/SecretChatEvent.h index e0c22e481..c01e86993 100644 --- a/td/telegram/logevent/SecretChatEvent.h +++ b/td/telegram/logevent/SecretChatEvent.h @@ -206,13 +206,12 @@ inline StringBuilder &operator<<(StringBuilder &sb, const EncryptedFileLocation class InboundSecretMessage : public SecretChatLogEventBase { public: static constexpr Type type = SecretChatEvent::Type::InboundSecretMessage; - int32 qts = 0; int32 chat_id = 0; int32 date = 0; BufferSlice encrypted_message; // empty when we store event to binlog - Promise qts_ack; + Promise promise; bool is_checked = false; // after decrypted and checked @@ -240,13 +239,13 @@ class InboundSecretMessage : public SecretChatLogEventBase BEGIN_STORE_FLAGS(); STORE_FLAG(has_encrypted_file); STORE_FLAG(is_pending); + STORE_FLAG(true); END_STORE_FLAGS(); - store(qts, storer); store(chat_id, storer); store(date, storer); // skip encrypted_message - // skip qts_ack + // skip promise // TODO decrypted_message_layer->store(storer); @@ -265,16 +264,21 @@ class InboundSecretMessage : public SecretChatLogEventBase void parse(ParserT &parser) { using td::parse; + bool no_qts; BEGIN_PARSE_FLAGS(); PARSE_FLAG(has_encrypted_file); PARSE_FLAG(is_pending); + PARSE_FLAG(no_qts); END_PARSE_FLAGS(); - parse(qts, parser); + if (!no_qts) { + int32 legacy_qts; + parse(legacy_qts, parser); + } parse(chat_id, parser); parse(date, parser); // skip encrypted_message - // skip qts_ack + // skip promise // TODO decrypted_message_layer = secret_api::decryptedMessageLayer::fetch(parser); @@ -292,12 +296,11 @@ class InboundSecretMessage : public SecretChatLogEventBase } StringBuilder &print(StringBuilder &sb) const override { - return sb << "[Logevent InboundSecretMessage " << tag("id", logevent_id()) << tag("qts", qts) - << tag("chat_id", chat_id) << tag("date", date) << tag("auth_key_id", format::as_hex(auth_key_id)) - << tag("message_id", message_id) << tag("my_in_seq_no", my_in_seq_no) - << tag("my_out_seq_no", my_out_seq_no) << tag("his_in_seq_no", his_in_seq_no) - << tag("message", to_string(decrypted_message_layer)) << tag("is_pending", is_pending) - << format::cond(has_encrypted_file, tag("file", file)) << "]"; + return sb << "[Logevent InboundSecretMessage " << tag("id", logevent_id()) << tag("chat_id", chat_id) + << tag("date", date) << tag("auth_key_id", format::as_hex(auth_key_id)) << tag("message_id", message_id) + << tag("my_in_seq_no", my_in_seq_no) << tag("my_out_seq_no", my_out_seq_no) + << tag("his_in_seq_no", his_in_seq_no) << tag("message", to_string(decrypted_message_layer)) + << tag("is_pending", is_pending) << format::cond(has_encrypted_file, tag("file", file)) << "]"; } }; diff --git a/td/telegram/net/DcAuthManager.cpp b/td/telegram/net/DcAuthManager.cpp index 8fa463eb9..c370fcf92 100644 --- a/td/telegram/net/DcAuthManager.cpp +++ b/td/telegram/net/DcAuthManager.cpp @@ -165,7 +165,7 @@ void DcAuthManager::dc_loop(DcInfo &dc) { switch (dc.state) { case DcInfo::State::Waiting: { // wait for timeout - // break; + // break; } case DcInfo::State::Export: { // send auth.exportAuthorization to auth_dc diff --git a/td/telegram/net/NetQuery.cpp b/td/telegram/net/NetQuery.cpp index 1a1f7be7b..e02565093 100644 --- a/td/telegram/net/NetQuery.cpp +++ b/td/telegram/net/NetQuery.cpp @@ -62,43 +62,4 @@ void NetQuery::set_error(Status status, string source) { set_error_impl(std::move(status), std::move(source)); } -TsList &NetQuery::get_net_query_list() { - static auto init_mutex = [] { - TsList::lock().unlock(); // initialize mutex before any NetQuery - return true; - }(); - CHECK(init_mutex); - static TsList net_query_list; - return net_query_list; -} - -void dump_pending_network_queries() { - auto n = NetQueryCounter::get_count(); - LOG(WARNING) << tag("pending net queries", n); - - decltype(n) i = 0; - bool was_gap = false; - auto &net_query_list = NetQuery::get_net_query_list(); - auto guard = net_query_list.lock(); - for (auto end = net_query_list.end(), cur = net_query_list.begin(); cur != end; cur = cur->get_next(), i++) { - if (i < 20 || i + 20 > n || i % (n / 20 + 1) == 0) { - if (was_gap) { - LOG(WARNING) << "..."; - was_gap = false; - } - const NetQueryDebug &debug = cur->get_data_unsafe(); - const NetQuery &nq = *static_cast(cur); - LOG(WARNING) << tag("user", debug.my_id_) << nq << tag("total flood", format::as_time(nq.total_timeout_)) - << tag("since start", format::as_time(Time::now_cached() - debug.start_timestamp_)) - << tag("state", debug.state_) - << tag("in this state", format::as_time(Time::now_cached() - debug.state_timestamp_)) - << tag("state changed", debug.state_change_count_) << tag("resend count", debug.resend_count_) - << tag("fail count", debug.send_failed_count_) << tag("ack state", debug.ack_state_) - << tag("unknown", debug.unknown_state_); - } else { - was_gap = true; - } - } -} - } // namespace td diff --git a/td/telegram/net/NetQuery.h b/td/telegram/net/NetQuery.h index 39cc73aaf..e62768787 100644 --- a/td/telegram/net/NetQuery.h +++ b/td/telegram/net/NetQuery.h @@ -8,6 +8,7 @@ #include "td/telegram/net/DcId.h" #include "td/telegram/net/NetQueryCounter.h" +#include "td/telegram/net/NetQueryStats.h" #include "td/actor/actor.h" #include "td/actor/PromiseFuture.h" @@ -40,18 +41,6 @@ class NetQueryCallback : public Actor { virtual void on_result_resendable(NetQueryPtr query, Promise promise); }; -struct NetQueryDebug { - double start_timestamp_ = 0; - int32 my_id_ = 0; - int32 resend_count_ = 0; - string state_ = "empty"; - double state_timestamp_ = 0; - int32 state_change_count_ = 0; - int32 send_failed_count_ = 0; - int ack_state_ = 0; - bool unknown_state_ = false; -}; - class NetQuery : public TsListNode { public: NetQuery() = default; @@ -235,7 +224,7 @@ class NetQuery : public TsListNode { *this = NetQuery(); } bool empty() const { - return state_ == State::Empty || nq_counter_.empty() || may_be_lost_; + return state_ == State::Empty || !nq_counter_ || may_be_lost_; } void stop_track() { @@ -250,14 +239,14 @@ class NetQuery : public TsListNode { void debug(string state, bool may_be_lost = false) { may_be_lost_ = may_be_lost; + VLOG(net_query) << *this << " " << tag("state", state); { auto guard = lock(); auto &data = get_data_unsafe(); - data.state_ = state; + data.state_ = std::move(state); data.state_timestamp_ = Time::now(); data.state_change_count_++; } - VLOG(net_query) << *this << " " << tag("state", state); } void set_callback(ActorShared callback) { @@ -277,10 +266,6 @@ class NetQuery : public TsListNode { finish_migrate(cancel_slot_); } - static int32 tl_magic(const BufferSlice &buffer_slice); - - static TsList &get_net_query_list(); - private: State state_ = State::Empty; Type type_ = Type::Common; @@ -332,6 +317,8 @@ class NetQuery : public TsListNode { static int32 get_my_id(); + static int32 tl_magic(const BufferSlice &buffer_slice); + public: double next_timeout_ = 1; // for NetQueryDelayer double total_timeout_ = 0; // for NetQueryDelayer/SequenceDispatcher @@ -345,13 +332,12 @@ class NetQuery : public TsListNode { int32 file_type_ = -1; // to be set by caller NetQuery(State state, uint64 id, BufferSlice &&query, BufferSlice &&answer, DcId dc_id, Type type, AuthFlag auth_flag, - GzipFlag gzip_flag, int32 tl_constructor, double total_timeout_limit) + GzipFlag gzip_flag, int32 tl_constructor, double total_timeout_limit, NetQueryStats *stats) : state_(state) , type_(type) , auth_flag_(auth_flag) , gzip_flag_(gzip_flag) , dc_id_(dc_id) - , nq_counter_(true) , status_() , id_(id) , query_(std::move(query)) @@ -361,7 +347,9 @@ class NetQuery : public TsListNode { get_data_unsafe().my_id_ = get_my_id(); get_data_unsafe().start_timestamp_ = Time::now(); LOG(INFO) << *this; - get_net_query_list().put(this); + if (stats) { + nq_counter_ = stats->register_query(this); + } } }; diff --git a/td/telegram/net/NetQueryCounter.cpp b/td/telegram/net/NetQueryCounter.cpp deleted file mode 100644 index eb4867931..000000000 --- a/td/telegram/net/NetQueryCounter.cpp +++ /dev/null @@ -1,13 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#include "td/telegram/net/NetQueryCounter.h" - -namespace td { - -std::atomic NetQueryCounter::net_query_cnt_{0}; - -} // namespace td diff --git a/td/telegram/net/NetQueryCounter.h b/td/telegram/net/NetQueryCounter.h index 79e9c7993..b3b39546b 100644 --- a/td/telegram/net/NetQueryCounter.h +++ b/td/telegram/net/NetQueryCounter.h @@ -9,48 +9,32 @@ #include "td/utils/common.h" #include +#include namespace td { class NetQueryCounter { - static std::atomic net_query_cnt_; - public: - static uint64 get_count() { - return net_query_cnt_.load(); + using Counter = std::atomic; + + NetQueryCounter() = default; + + explicit NetQueryCounter(Counter *counter) : ptr_(counter) { + CHECK(counter != nullptr); + counter->fetch_add(1, std::memory_order_relaxed); } - bool empty() const { - return !is_alive_; - } - - explicit NetQueryCounter(bool is_alive = false) : is_alive_(is_alive) { - if (is_alive) { - net_query_cnt_++; - } - } - - NetQueryCounter(const NetQueryCounter &other) = delete; - NetQueryCounter &operator=(const NetQueryCounter &other) = delete; - NetQueryCounter(NetQueryCounter &&other) : is_alive_(other.is_alive_) { - other.is_alive_ = false; - } - NetQueryCounter &operator=(NetQueryCounter &&other) { - if (is_alive_) { - net_query_cnt_--; - } - is_alive_ = other.is_alive_; - other.is_alive_ = false; - return *this; - } - ~NetQueryCounter() { - if (is_alive_) { - net_query_cnt_--; - } + explicit operator bool() const { + return static_cast(ptr_); } private: - bool is_alive_; + struct Deleter { + void operator()(Counter *ptr) { + ptr->fetch_sub(1, std::memory_order_relaxed); + } + }; + std::unique_ptr ptr_; }; } // namespace td diff --git a/td/telegram/net/NetQueryCreator.cpp b/td/telegram/net/NetQueryCreator.cpp index 85c088458..dd399a340 100644 --- a/td/telegram/net/NetQueryCreator.cpp +++ b/td/telegram/net/NetQueryCreator.cpp @@ -32,7 +32,7 @@ NetQueryPtr NetQueryCreator::create(uint64 id, const telegram_api::Function &fun LOG_CHECK(real_size == slice.size()) << real_size << " " << slice.size() << " " << format::as_hex_dump<4>(Slice(slice.as_slice())); - int32 tl_constructor = NetQuery::tl_magic(slice); + int32 tl_constructor = function.get_id(); size_t MIN_GZIPPED_SIZE = 128; auto gzip_flag = slice.size() < MIN_GZIPPED_SIZE ? NetQuery::GzipFlag::Off : NetQuery::GzipFlag::On; @@ -62,13 +62,14 @@ NetQueryPtr NetQueryCreator::create(uint64 id, const telegram_api::Function &fun if (auth_manager != nullptr && auth_manager->is_bot()) { total_timeout_limit = 8; } - if ((auth_manager == nullptr || !auth_manager->was_authorized()) && auth_flag == NetQuery::AuthFlag::On) { + if ((auth_manager == nullptr || !auth_manager->was_authorized()) && auth_flag == NetQuery::AuthFlag::On && + tl_constructor != telegram_api::auth_exportAuthorization::ID) { LOG(ERROR) << "Send query before authorization: " << to_string(function); } } } auto query = object_pool_.create(NetQuery::State::Query, id, std::move(slice), BufferSlice(), dc_id, type, auth_flag, - gzip_flag, tl_constructor, total_timeout_limit); + gzip_flag, tl_constructor, total_timeout_limit, net_query_stats_.get()); query->set_cancellation_token(query.generation()); return query; } diff --git a/td/telegram/net/NetQueryCreator.h b/td/telegram/net/NetQueryCreator.h index ba4e6b8fa..d8bd3debd 100644 --- a/td/telegram/net/NetQueryCreator.h +++ b/td/telegram/net/NetQueryCreator.h @@ -8,11 +8,14 @@ #include "td/telegram/net/DcId.h" #include "td/telegram/net/NetQuery.h" +#include "td/telegram/net/NetQueryStats.h" #include "td/telegram/UniqueId.h" #include "td/utils/buffer.h" #include "td/utils/ObjectPool.h" +#include + namespace td { namespace telegram_api { @@ -21,7 +24,8 @@ class Function; class NetQueryCreator { public: - NetQueryCreator() { + explicit NetQueryCreator(std::shared_ptr net_query_stats = {}) { + net_query_stats_ = std::move(net_query_stats); object_pool_.set_check_empty(true); } @@ -31,7 +35,8 @@ class NetQueryCreator { NetQueryPtr create_update(BufferSlice &&buffer) { return object_pool_.create(NetQuery::State::OK, 0, BufferSlice(), std::move(buffer), DcId::main(), - NetQuery::Type::Common, NetQuery::AuthFlag::On, NetQuery::GzipFlag::Off, 0, 0); + NetQuery::Type::Common, NetQuery::AuthFlag::On, NetQuery::GzipFlag::Off, 0, 0, + net_query_stats_.get()); } NetQueryPtr create(const telegram_api::Function &function, DcId dc_id = DcId::main(), @@ -45,6 +50,7 @@ class NetQueryCreator { NetQuery::AuthFlag auth_flag); private: + std::shared_ptr net_query_stats_; ObjectPool object_pool_; }; diff --git a/td/telegram/net/NetQueryStats.cpp b/td/telegram/net/NetQueryStats.cpp new file mode 100644 index 000000000..f332a7965 --- /dev/null +++ b/td/telegram/net/NetQueryStats.cpp @@ -0,0 +1,52 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/telegram/net/NetQueryStats.h" + +#include "td/telegram/net/NetQuery.h" + +#include "td/utils/format.h" +#include "td/utils/logging.h" +#include "td/utils/Time.h" + +namespace td { + +uint64 NetQueryStats::get_count() const { + return count_.load(std::memory_order_relaxed); +} + +void NetQueryStats::dump_pending_network_queries() { + auto n = get_count(); + LOG(WARNING) << tag("pending net queries", n); + + if (!use_list_) { + return; + } + decltype(n) i = 0; + bool was_gap = false; + auto &net_query_list = list_; + auto guard = net_query_list.lock(); + for (auto end = net_query_list.end(), cur = net_query_list.begin(); cur != end; cur = cur->get_next(), i++) { + if (i < 20 || i + 20 > n || i % (n / 20 + 1) == 0) { + if (was_gap) { + LOG(WARNING) << "..."; + was_gap = false; + } + const NetQueryDebug &debug = cur->get_data_unsafe(); + const NetQuery &nq = *static_cast(cur); + LOG(WARNING) << tag("user", debug.my_id_) << nq << tag("total flood", format::as_time(nq.total_timeout_)) + << tag("since start", format::as_time(Time::now_cached() - debug.start_timestamp_)) + << tag("state", debug.state_) + << tag("in this state", format::as_time(Time::now_cached() - debug.state_timestamp_)) + << tag("state changed", debug.state_change_count_) << tag("resend count", debug.resend_count_) + << tag("fail count", debug.send_failed_count_) << tag("ack state", debug.ack_state_) + << tag("unknown", debug.unknown_state_); + } else { + was_gap = true; + } + } +} +} // namespace td diff --git a/td/telegram/net/NetQueryStats.h b/td/telegram/net/NetQueryStats.h new file mode 100644 index 000000000..c8a7ff469 --- /dev/null +++ b/td/telegram/net/NetQueryStats.h @@ -0,0 +1,49 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/telegram/net/NetQueryCounter.h" + +#include "td/utils/common.h" +#include "td/utils/TsList.h" + +#include + +namespace td { + +struct NetQueryDebug { + double start_timestamp_ = 0; + int32 my_id_ = 0; + int32 resend_count_ = 0; + string state_ = "empty"; + double state_timestamp_ = 0; + int32 state_change_count_ = 0; + int32 send_failed_count_ = 0; + int ack_state_ = 0; + bool unknown_state_ = false; +}; + +class NetQueryStats { + public: + NetQueryCounter register_query(TsListNode *query) { + if (use_list_.load(std::memory_order_relaxed)) { + list_.put(query); + } + return NetQueryCounter(&count_); + } + + uint64 get_count() const; + + void dump_pending_network_queries(); + + private: + NetQueryCounter::Counter count_; + std::atomic use_list_{true}; + TsList list_; +}; + +} // namespace td diff --git a/td/telegram/net/Session.cpp b/td/telegram/net/Session.cpp index b0cd89b62..d754adbf2 100644 --- a/td/telegram/net/Session.cpp +++ b/td/telegram/net/Session.cpp @@ -1070,7 +1070,8 @@ void Session::connection_open_finish(ConnectionInfo *info, info->state = ConnectionInfo::State::Ready; info->created_at = Time::now_cached(); info->wakeup_at = Time::now_cached() + 10; - if (unknown_queries_.size() > 1024) { + if (unknown_queries_.size() > MAX_INFLIGHT_QUERIES) { + LOG(ERROR) << "With current limits `Too much queries with unknown state` error must be impossible"; on_session_failed(Status::Error("Too much queries with unknown state")); return; } @@ -1311,12 +1312,12 @@ void Session::loop() { while (main_connection_.state == ConnectionInfo::State::Ready) { if (auth_data_.is_ready(Time::now_cached())) { if (need_send_query()) { - while (!pending_queries_.empty()) { + while (!pending_queries_.empty() && sent_queries_.size() < MAX_INFLIGHT_QUERIES) { auto &query = pending_queries_.front(); connection_send_query(&main_connection_, std::move(query)); pending_queries_.pop_front(); + need_flush = true; } - need_flush = true; } if (need_send_bind_key()) { // send auth.bindTempAuthKey diff --git a/td/telegram/net/Session.h b/td/telegram/net/Session.h index 128aa1e4f..b7e292454 100644 --- a/td/telegram/net/Session.h +++ b/td/telegram/net/Session.h @@ -157,6 +157,7 @@ class Session final bool close_flag_ = false; static constexpr double ACTIVITY_TIMEOUT = 60 * 5; + static constexpr size_t MAX_INFLIGHT_QUERIES = 1024; struct ContainerInfo { size_t ref_cnt; diff --git a/td/tl/TlObject.h b/td/tl/TlObject.h index f4ebb2624..51b69f0dc 100644 --- a/td/tl/TlObject.h +++ b/td/tl/TlObject.h @@ -13,7 +13,6 @@ #include #include -#include #include #include #include diff --git a/tdactor/td/actor/SchedulerLocalStorage.h b/tdactor/td/actor/SchedulerLocalStorage.h index 16c0b6a99..61a570873 100644 --- a/tdactor/td/actor/SchedulerLocalStorage.h +++ b/tdactor/td/actor/SchedulerLocalStorage.h @@ -46,6 +46,10 @@ class LazySchedulerLocalStorage { LazySchedulerLocalStorage() = default; explicit LazySchedulerLocalStorage(std::function create_func) : create_func_(std::move(create_func)) { } + void set_create_func(std::function create_func) { + CHECK(!create_func_); + create_func_ = create_func; + } T &get() { auto &optional_value_ = sls_optional_value_.get(); diff --git a/tdutils/td/utils/MemoryLog.h b/tdutils/td/utils/MemoryLog.h index 108e62c0d..c762d28bc 100644 --- a/tdutils/td/utils/MemoryLog.h +++ b/tdutils/td/utils/MemoryLog.h @@ -20,6 +20,9 @@ template class MemoryLog : public LogInterface { static constexpr size_t MAX_OUTPUT_SIZE = buffer_size / 16 < (8 << 10) ? buffer_size / 16 : (8 << 10); + static_assert((buffer_size & (buffer_size - 1)) == 0, "Buffer size must be power of 2"); + static_assert(buffer_size >= (8 << 10), "Too small buffer size"); + public: MemoryLog() { std::memset(buffer_, ' ', sizeof(buffer_)); @@ -43,13 +46,13 @@ class MemoryLog : public LogInterface { uint32 end_pos = start_pos + total_size; if (likely(end_pos <= buffer_size)) { std::memcpy(&buffer_[start_pos + MAGIC_SIZE], slice.data(), slice_size); - std::memcpy(&buffer_[start_pos + MAGIC_SIZE + slice_size], " ", pad_size); + std::memcpy(&buffer_[start_pos + MAGIC_SIZE + slice_size], " ", pad_size); } else { size_t first = buffer_size - start_pos - MAGIC_SIZE; size_t second = slice_size - first; std::memcpy(&buffer_[start_pos + MAGIC_SIZE], slice.data(), first); std::memcpy(&buffer_[0], slice.data() + first, second); - std::memcpy(&buffer_[second], " ", pad_size); + std::memcpy(&buffer_[second], " ", pad_size); } CHECK((start_pos & 15) == 0); diff --git a/tdutils/td/utils/TsFileLog.cpp b/tdutils/td/utils/TsFileLog.cpp index e8f75c964..bc5b04ff3 100644 --- a/tdutils/td/utils/TsFileLog.cpp +++ b/tdutils/td/utils/TsFileLog.cpp @@ -40,9 +40,6 @@ class TsFileLog : public LogInterface { return res; } - void append(CSlice cslice) override { - return append(cslice, -1); - } void append(CSlice cslice, int log_level) override { get_current_logger()->append(cslice, log_level); } diff --git a/tdutils/td/utils/TsList.h b/tdutils/td/utils/TsList.h index 064f1265b..c9adaf758 100644 --- a/tdutils/td/utils/TsList.h +++ b/tdutils/td/utils/TsList.h @@ -180,9 +180,8 @@ class TsList : public TsListNode { } this->parent = nullptr; } - static std::unique_lock lock() TD_WARN_UNUSED_RESULT { - static std::mutex mutex; - return std::unique_lock(mutex); + std::unique_lock lock() TD_WARN_UNUSED_RESULT { + return std::unique_lock(mutex_); } TsListNode *begin() { return this->get_next(); @@ -198,10 +197,16 @@ class TsList : public TsListNode { } return res; } + + private: + std::mutex mutex_; }; template std::unique_lock TsListNode::lock() { + if (parent == nullptr) { + return {}; + } CHECK(parent != nullptr); return parent->lock(); } diff --git a/tdutils/td/utils/logging.cpp b/tdutils/td/utils/logging.cpp index ffc03bcbe..46a5dbc60 100644 --- a/tdutils/td/utils/logging.cpp +++ b/tdutils/td/utils/logging.cpp @@ -56,21 +56,34 @@ Logger::Logger(LogInterface &log, const LogOptions &options, int log_level, Slic // log level sb_ << '['; - if (log_level < 10) { - sb_ << ' '; + if (static_cast(log_level) < 10) { + sb_ << ' ' << static_cast('0' + log_level); + } else { + sb_ << log_level; } - sb_ << log_level << ']'; + sb_ << ']'; // thread id auto thread_id = get_thread_id(); sb_ << "[t"; - if (thread_id < 10) { - sb_ << ' '; + if (static_cast(thread_id) < 10) { + sb_ << ' ' << static_cast('0' + thread_id); + } else { + sb_ << thread_id; } - sb_ << thread_id << ']'; + sb_ << ']'; // timestamp - sb_ << '[' << StringBuilder::FixedDouble(Clocks::system(), 9) << ']'; + auto time = Clocks::system(); + auto unix_time = static_cast(time); + auto nanoseconds = static_cast((time - unix_time) * 1e9); + sb_ << '[' << unix_time << '.'; + uint32 limit = 100000000; + while (nanoseconds < limit && limit > 1) { + sb_ << '0'; + limit /= 10; + } + sb_ << nanoseconds << ']'; // file : line if (!file_name.empty()) { @@ -79,7 +92,7 @@ Logger::Logger(LogInterface &log, const LogOptions &options, int log_level, Slic last_slash_--; } file_name = file_name.substr(last_slash_ + 1); - sb_ << "[" << file_name << ':' << line_num << ']'; + sb_ << '[' << file_name << ':' << static_cast(line_num) << ']'; } // context from tag_ diff --git a/tdutils/td/utils/logging.h b/tdutils/td/utils/logging.h index aa5bab42f..94a20c13e 100644 --- a/tdutils/td/utils/logging.h +++ b/tdutils/td/utils/logging.h @@ -182,14 +182,12 @@ class LogInterface { LogInterface(LogInterface &&) = delete; LogInterface &operator=(LogInterface &&) = delete; virtual ~LogInterface() = default; - virtual void append(CSlice slice) { - append(slice, -1); - } - virtual void append(CSlice slice, int /*log_level*/) { - append(slice); - } + + virtual void append(CSlice slice, int log_level) = 0; + virtual void rotate() { } + virtual vector get_file_paths() { return {}; } diff --git a/tdutils/test/log.cpp b/tdutils/test/log.cpp index b76f95579..de56c2d17 100644 --- a/tdutils/test/log.cpp +++ b/tdutils/test/log.cpp @@ -8,6 +8,7 @@ #include "td/utils/FileLog.h" #include "td/utils/format.h" #include "td/utils/logging.h" +#include "td/utils/MemoryLog.h" #include "td/utils/port/path.h" #include "td/utils/port/thread.h" #include "td/utils/Slice.h" @@ -23,11 +24,15 @@ char disable_linker_warning_about_empty_file_tdutils_test_log_cpp TD_UNUSED; template class LogBenchmark : public td::Benchmark { public: - LogBenchmark(std::string name, int threads_n, std::function()> creator) - : name_(std::move(name)), threads_n_(threads_n), creator_(std::move(creator)) { + LogBenchmark(std::string name, int threads_n, bool test_full_logging, std::function()> creator) + : name_(std::move(name)) + , threads_n_(threads_n) + , test_full_logging_(test_full_logging) + , creator_(std::move(creator)) { } std::string get_description() const override { - return PSTRING() << name_ << " " << td::tag("threads_n", threads_n_); + return PSTRING() << name_ << " " << (test_full_logging_ ? "ERROR" : "PLAIN") << " " + << td::tag("threads_n", threads_n_); } void start_up() override { log_ = creator_(); @@ -40,12 +45,17 @@ class LogBenchmark : public td::Benchmark { log_.reset(); } void run(int n) override { + auto old_log_interface = td::log_interface; + td::log_interface = log_.get(); + for (auto &thread : threads_) { thread = td::thread([this, n] { this->run_thread(n); }); } for (auto &thread : threads_) { thread.join(); } + + td::log_interface = old_log_interface; } void run_thread(int n) { @@ -54,27 +64,41 @@ class LogBenchmark : public td::Benchmark { if (i % 10000 == 0) { log_->rotate(); } - log_->append(str); + if (test_full_logging_) { + LOG(ERROR) << str; + } else { + LOG(PLAIN) << str; + } } } private: std::string name_; - td::unique_ptr log_; + td::unique_ptr log_; int threads_n_{0}; + bool test_full_logging_{false}; std::function()> creator_; std::vector threads_; }; template -static void bench_log(std::string name, int threads_n, F &&f) { - bench(LogBenchmark(std::move(name), threads_n, std::move(f))); +static void bench_log(std::string name, F &&f) { + for (auto test_full_logging : {false, true}) { + for (auto threads_n : {1, 4, 8}) { + bench(LogBenchmark(name, threads_n, test_full_logging, f)); + } + } }; -TEST(Log, TsLogger) { - bench_log("NewTsFileLog", 4, +TEST(Log, Bench) { + bench_log("NullLog", [] { return td::make_unique(); }); + + bench_log("MemoryLog", [] { return td::make_unique>(); }); + + bench_log("TsFileLog", [] { return td::TsFileLog::create("tmplog", std::numeric_limits::max(), false).move_as_ok(); }); - bench_log("TsFileLog", 8, [] { + + bench_log("FileLog + TsLog", [] { class FileLog : public td::LogInterface { public: FileLog() { @@ -83,8 +107,8 @@ TEST(Log, TsLogger) { } ~FileLog() { } - void append(td::CSlice slice) override { - ts_log_.append(slice, -1); + void append(td::CSlice slice, int log_level) override { + ts_log_.append(slice, log_level); } std::vector get_file_paths() override { return file_log_.get_file_paths(); @@ -97,16 +121,7 @@ TEST(Log, TsLogger) { return td::make_unique(); }); - bench_log("noop", 4, [] { - class NoopLog : public td::LogInterface { - public: - void append(td::CSlice slice) override { - } - }; - return td::make_unique(); - }); - - bench_log("FileLog", 4, [] { + bench_log("FileLog", [] { class FileLog : public td::LogInterface { public: FileLog() { @@ -114,8 +129,8 @@ TEST(Log, TsLogger) { } ~FileLog() { } - void append(td::CSlice slice) override { - file_log_.append(slice, -1); + void append(td::CSlice slice, int log_level) override { + file_log_.append(slice, log_level); } std::vector get_file_paths() override { return file_log_.get_file_paths(); diff --git a/tdutils/test/misc.cpp b/tdutils/test/misc.cpp index 144b52535..cc62bdb37 100644 --- a/tdutils/test/misc.cpp +++ b/tdutils/test/misc.cpp @@ -433,7 +433,6 @@ static void test_to_double() { TEST(Misc, to_double) { test_to_double(); const char *locale_name = (std::setlocale(LC_ALL, "fr-FR") == nullptr ? "C" : "fr-FR"); - LOG(ERROR) << locale_name; std::locale new_locale(locale_name); auto host_locale = std::locale::global(new_locale); test_to_double(); diff --git a/tdutils/test/port.cpp b/tdutils/test/port.cpp index ddeb6ff77..d4afe6a87 100644 --- a/tdutils/test/port.cpp +++ b/tdutils/test/port.cpp @@ -183,7 +183,7 @@ TEST(Post, SignalsAndThread) { } CHECK(ptrs == ans); - LOG(ERROR) << ptrs; + //LOG(ERROR) << ptrs; //LOG(ERROR) << std::set(addrs.begin(), addrs.end()).size(); //LOG(ERROR) << addrs; } diff --git a/test/secret.cpp b/test/secret.cpp index a5cf8f61c..1a7f94cd2 100644 --- a/test/secret.cpp +++ b/test/secret.cpp @@ -588,11 +588,10 @@ class Master : public Actor { void add_inbound_message(int32 chat_id, BufferSlice data, uint64 crc) { CHECK(crc64(data.as_slice()) == crc); auto event = make_unique(); - event->qts = 0; event->chat_id = chat_id; event->date = 0; event->encrypted_message = std::move(data); - event->qts_ack = PromiseCreator::lambda( + event->promise = PromiseCreator::lambda( [actor_id = actor_id(this), chat_id, data = event->encrypted_message.copy(), crc](Result<> result) mutable { if (result.is_ok()) { LOG(INFO) << "FINISH add_inbound_message " << tag("crc", crc); diff --git a/test/tdclient.cpp b/test/tdclient.cpp index ed9e64823..44d14baaf 100644 --- a/test/tdclient.cpp +++ b/test/tdclient.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include REGISTER_TESTS(tdclient); @@ -875,7 +876,7 @@ TEST(Client, SimpleMulti) { #if !TD_THREAD_UNSUPPORTED TEST(Client, Multi) { - std::vector threads; + td::vector threads; for (int i = 0; i < 4; i++) { threads.emplace_back([] { for (int i = 0; i < 1000; i++) { @@ -895,6 +896,32 @@ TEST(Client, Multi) { thread.join(); } } + +TEST(Client, MultiNew) { + td::vector threads; + td::MultiClient client; + int threads_n = 4; + int clients_n = 1000; + for (int i = 0; i < threads_n; i++) { + threads.emplace_back([&] { + for (int i = 0; i < clients_n; i++) { + auto id = client.create_client(); + client.send(id, 3, td::make_tl_object(3)); + } + }); + } + for (auto &thread : threads) { + thread.join(); + } + + std::set ids; + while (ids.size() != static_cast(threads_n) * clients_n) { + auto event = client.receive(10); + if (event.client_id != 0 && event.id == 3) { + ids.insert(event.client_id); + } + } +} #endif TEST(PartsManager, hands) {