From 5ac5acd1cb1f4f5f514cd8201f8c764de9a5f89c Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 8 Oct 2020 01:28:24 +0300 Subject: [PATCH] Fix ClientManager closing. GitOrigin-RevId: eb588d9991ea6c8c2b4a339d1396d58179c84f43 --- td/telegram/Client.cpp | 116 ++++++++++++++++++++++++++++------------- 1 file changed, 81 insertions(+), 35 deletions(-) diff --git a/td/telegram/Client.cpp b/td/telegram/Client.cpp index 593661905..aa8d298f9 100644 --- a/td/telegram/Client.cpp +++ b/td/telegram/Client.cpp @@ -92,20 +92,26 @@ class ClientManager::Impl final { } void send(ClientId client_id, RequestId request_id, td_api::object_ptr &&request) { - Request request; - request.client_id = client_id; - request.id = request_id; - request.request = std::move(request); - requests_.push_back(std::move(request)); + requests_.push_back({client_id, request_id, std::move(request)}); } Response receive(double timeout) { if (!requests_.empty()) { auto guard = concurrent_scheduler_->get_main_guard(); - for (auto &request : requests_) { - auto &td = tds_[request.client_id]; - CHECK(!td.empty()); - send_closure_later(td, &Td::request, request.id, std::move(request.request)); + for (size_t i = 0; i < requests_.size(); i++) { + auto &request = requests_[i]; + auto it = tds_.find(request.client_id); + if (it == tds_.end()) { + receiver_->add_response(request.client_id, request.id, + td_api::make_object(400, "Invalid TDLib instance specified")); + continue; + } + if (it->second.empty()) { + receiver_->add_response(request.client_id, request.id, + td_api::make_object(500, "Request aborted")); + continue; + } + send_closure_later(it->second, &Td::request, request.id, std::move(request.request)); } requests_.clear(); } @@ -117,9 +123,21 @@ class ClientManager::Impl final { } else { ConcurrentScheduler::emscripten_clear_main_timeout(); } + if (response.request_id == 0 && response.object != nullptr && + response.object->get_id() == td::td_api::updateAuthorizationState::ID && + static_cast(response.object.get()) + ->authorization_state_->get_id() == td::td_api::authorizationStateClosed::ID) { + auto it = tds_.find(response.client_id); + CHECK(it != tds_.end()); + it->second.reset(); + } if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) { auto guard = concurrent_scheduler_->get_main_guard(); - tds_.erase(response.client_id); + auto it = tds_.find(response.client_id); + CHECK(it != tds_.end()); + CHECK(it->second.empty()); + tds_.erase(it); + response.client_id = 0; } return response; } @@ -132,7 +150,7 @@ class ClientManager::Impl final { { auto guard = concurrent_scheduler_->get_main_guard(); for (auto &td : tds_) { - td.second = {}; + td.second.reset(); } } while (!tds_.empty()) { @@ -148,7 +166,7 @@ class ClientManager::Impl final { RequestId id; td_api::object_ptr request; }; - td::vector requests_; + vector requests_; unique_ptr concurrent_scheduler_; ClientId client_id_{0}; Td::Options options_; @@ -157,18 +175,18 @@ class ClientManager::Impl final { class Client::Impl final { public: - Impl() { - client_id_ = impl_.create_client(); + Impl() : client_id_(impl_.create_client()) { } void send(Request request) { - impl_.send(client_id_, request.id, std::move(request.request)); + impl_.send(client_id_, request.id, std::move(request.function)); } Response receive(double timeout) { auto response = impl_.receive(timeout); + Response old_response; - old_response.id = response.id; + old_response.id = response.request_id; old_response.object = std::move(response.object); return old_response; } @@ -205,8 +223,8 @@ class MultiTd : public Actor { } void close(int32 td_id) { - // no check that td_id hasn't been deleted before - tds_.erase(td_id); + size_t erased = tds_.erase(td_id); + CHECK(erased > 0); } private: @@ -364,12 +382,12 @@ class MultiImplPool { } auto &impl = *std::min_element(impls_.begin(), impls_.end(), [](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); }); - auto res = impl.lock(); - if (!res) { - res = std::make_shared(net_query_stats_); - impl = res; + auto result = impl.lock(); + if (!result) { + result = std::make_shared(net_query_stats_); + impl = result; } - return res; + return result; } private: @@ -385,7 +403,7 @@ class ClientManager::Impl final { auto client_id = impl->create(*receiver_); { auto lock = impls_mutex_.lock_write().move_as_ok(); - impls_[client_id] = std::move(impl); + impls_[client_id].impl = std::move(impl); } return client_id; } @@ -398,18 +416,42 @@ class ClientManager::Impl final { td_api::make_object(400, "Invalid TDLib instance specified")); return; } - it->second->send(client_id, request_id, std::move(request)); + if (it->second.is_closed) { + receiver_->add_response(client_id, request_id, td_api::make_object(500, "Request aborted")); + return; + } + it->second.impl->send(client_id, request_id, std::move(request)); } Response receive(double timeout) { auto response = receiver_->receive(timeout); + if (response.request_id == 0 && response.object != nullptr && + response.object->get_id() == td::td_api::updateAuthorizationState::ID && + static_cast(response.object.get()) + ->authorization_state_->get_id() == td::td_api::authorizationStateClosed::ID) { + auto lock = impls_mutex_.lock_write().move_as_ok(); + close_impl(response.client_id); + } if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) { auto lock = impls_mutex_.lock_write().move_as_ok(); - impls_.erase(response.client_id); + auto it = impls_.find(response.client_id); + CHECK(it != impls_.end()); + CHECK(it->second.is_closed); + impls_.erase(it); + response.client_id = 0; } return response; } + void close_impl(ClientId client_id) { + auto it = impls_.find(client_id); + CHECK(it != impls_.end()); + if (!it->second.is_closed) { + it->second.is_closed = true; + it->second.impl->close(client_id); + } + } + Impl() = default; Impl(const Impl &) = delete; Impl &operator=(const Impl &) = delete; @@ -417,7 +459,7 @@ class ClientManager::Impl final { Impl &operator=(Impl &&) = delete; ~Impl() { for (auto &it : impls_) { - it.second->close(it.first); + close_impl(it.first); } while (!impls_.empty()) { receive(10); @@ -427,7 +469,11 @@ class ClientManager::Impl final { private: MultiImplPool pool_; RwMutex impls_mutex_; - std::unordered_map> impls_; + struct MultiImplInfo { + std::shared_ptr impl; + bool is_closed = false; + }; + std::unordered_map impls_; unique_ptr receiver_{make_unique()}; }; @@ -440,7 +486,7 @@ class Client::Impl final { td_id_ = multi_impl_->create(*receiver_); } - void send(Client::Request request) { + void send(Request request) { if (request.id == 0 || request.function == nullptr) { LOG(ERROR) << "Drop wrong request " << request.id; return; @@ -449,13 +495,13 @@ class Client::Impl final { multi_impl_->send(td_id_, request.id, std::move(request.function)); } - Client::Response receive(double timeout) { - auto res = receiver_->receive(timeout); + Response receive(double timeout) { + auto response = receiver_->receive(timeout); - Client::Response old_res; - old_res.id = res.request_id; - old_res.object = std::move(res.object); - return old_res; + Response old_response; + old_response.id = response.request_id; + old_response.object = std::move(response.object); + return old_response; } Impl(const Impl &) = delete;