// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/telegram/Client.h" #include "td/telegram/Td.h" #include "td/telegram/TdCallback.h" #include "td/actor/actor.h" #include "td/utils/common.h" #include "td/utils/crypto.h" #include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/MpscPollableQueue.h" #include "td/utils/port/RwMutex.h" #include "td/utils/port/thread.h" #include "td/utils/death_handler.h" #include #include #include #include #include #include namespace td { class MultiTd : public Actor { public: explicit MultiTd(Td::Options options) : options_(std::move(options)) { } void create(int32 td_id, unique_ptr callback) { auto &td = tds_[td_id]; CHECK(td.empty()); string name = "Td"; auto context = std::make_shared(); auto old_context = set_context(context); auto old_tag = set_tag(to_string(td_id)); td = create_actor("Td", std::move(callback), options_); set_context(old_context); set_tag(old_tag); } void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) { auto &td = tds_[client_id]; CHECK(!td.empty()); send_closure(td, &Td::request, request_id, std::move(function)); } void close(int32 td_id) { auto size = tds_.erase(td_id); CHECK(size == 1); } private: Td::Options options_; std::unordered_map> tds_; }; #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED class TdReceiver { public: MultiClient::Response receive(double timeout) { return receive(timeout, true, true); } MultiClient::Response receive(double timeout, bool include_responses, bool include_updates) { if (include_responses && !responses_.empty()) { auto result = std::move(responses_.front()); responses_.pop(); return result; } if (include_updates && !updates_.empty()) { auto result = std::move(updates_.front()); updates_.pop(); return result; } return {0, 0, nullptr}; } unique_ptr create_callback(MultiClient::ClientId client_id) { class Callback : public TdCallback { public: Callback(MultiClient::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) { } void on_result(uint64 id, td_api::object_ptr result) override { if (id == 0) { impl_->responses_.push({client_id_, id, nullptr}); impl_->updates_.push({client_id_, 0, std::move(result)}); } else { impl_->responses_.push({client_id_, id, std::move(result)}); impl_->updates_.push({client_id_, id, nullptr}); } } void on_error(uint64 id, td_api::object_ptr error) override { if (id == 0) { impl_->responses_.push({client_id_, 0, nullptr}); impl_->updates_.push({client_id_, 0, std::move(error)}); } else { impl_->responses_.push({client_id_, id, std::move(error)}); impl_->updates_.push({client_id_, id, nullptr}); } } Callback(const Callback &) = delete; Callback &operator=(const Callback &) = delete; Callback(Callback &&) = delete; Callback &operator=(Callback &&) = delete; ~Callback() override { impl_->responses_.push({client_id_, 0, nullptr}); impl_->updates_.push({client_id_, 0, nullptr}); } private: MultiClient::ClientId client_id_; TdReceiver *impl_; }; return td::make_unique(client_id, this); } private: std::queue updates_; std::queue responses_; }; class MultiClient::Impl final { public: Impl() { options_.net_query_stats = std::make_shared(); concurrent_scheduler_ = make_unique(); concurrent_scheduler_->init(0); receiver_ = make_unique(); concurrent_scheduler_->start(); } ClientId create_client() { auto client_id = ++client_id_; tds_[client_id] = concurrent_scheduler_->create_actor_unsafe(0, "Td", receiver_->create_callback(client_id), options_); return client_id; } void send(ClientId client_id, RequestId request_id, Function function) { Request request; request.client_id = client_id; request.id = request_id; request.function = std::move(function); requests_.push_back(std::move(request)); } Response receive(double timeout) { return receive(timeout, true, true); } Response receive(double timeout, bool include_responses, bool include_updates) { if (!requests_.empty()) { auto guard = concurrent_scheduler_->get_main_guard(); for (auto &request : requests_) { auto &td = tds_[request.client_id]; CHECK(!td.empty()); send_closure_later(td, &Td::request, request.id, std::move(request.function)); } requests_.clear(); } auto response = receiver_->receive(0, include_responses, include_updates); if (response.client_id == 0) { concurrent_scheduler_->run_main(0); response = receiver_->receive(0, include_responses, include_updates); } else { ConcurrentScheduler::emscripten_clear_main_timeout(); } if (response.client_id != 0 && !response.object) { auto guard = concurrent_scheduler_->get_main_guard(); tds_.erase(response.client_id); } return response; } Impl(const Impl &) = delete; Impl &operator=(const Impl &) = delete; Impl(Impl &&) = delete; Impl &operator=(Impl &&) = delete; ~Impl() { { auto guard = concurrent_scheduler_->get_main_guard(); for (auto &td : tds_) { td.second = {}; } } while (!tds_.empty()) { receive(10); } concurrent_scheduler_->finish(); } private: unique_ptr receiver_; struct Request { ClientId client_id; RequestId id; Function function; }; std::vector requests_; unique_ptr concurrent_scheduler_; ClientId client_id_{0}; Td::Options options_; std::unordered_map> tds_; }; class Client::Impl final { public: Impl() { client_id_ = impl_.create_client(); } void send(Request request) { impl_.send(client_id_, request.id, std::move(request.function)); } Response receive(double timeout) { return receive(timeout, true, true); } Response receive(double timeout, bool include_responses, bool include_updates) { auto response = impl_.receive(timeout, include_responses, include_updates); Response old_response; old_response.id = response.id; old_response.object = std::move(response.object); return old_response; } private: MultiClient::Impl impl_; MultiClient::ClientId client_id_; }; #else class TdReceiver { public: TdReceiver() { output_responses_queue_ = std::make_shared(); output_responses_queue_->init(); output_updates_queue_ = std::make_shared(); output_updates_queue_->init(); } MultiClient::Response receive(double timeout) { return receive(timeout, true, true); } MultiClient::Response receive(double timeout, bool include_responses, bool include_updates) { VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout; bool is_responses_locked = false; bool is_updates_locked = false; if (include_responses) { is_responses_locked = receive_responses_lock_.exchange(true); CHECK(!is_responses_locked); } if (include_updates) { is_updates_locked = receive_updates_lock_.exchange(true); CHECK(!is_updates_locked); } auto response = receive_unlocked(timeout, include_responses, include_updates); if (include_updates) { is_updates_locked = receive_updates_lock_.exchange(false); CHECK(is_updates_locked); } if (include_responses) { is_responses_locked = receive_responses_lock_.exchange(false); CHECK(is_responses_locked); } VLOG(td_requests) << "End to wait for updates, returning object " << response.id << ' ' << response.object.get(); return response; } unique_ptr create_callback(MultiClient::ClientId client_id) { class Callback : public TdCallback { public: explicit Callback(MultiClient::ClientId client_id, std::shared_ptr output_responses_queue, std::shared_ptr output_updates_queue) : client_id_(client_id), output_responses_queue_(std::move(output_responses_queue)), output_updates_queue_(std::move(output_updates_queue)) { } void on_result(uint64 id, td_api::object_ptr result) override { if (id == 0) { output_responses_queue_->writer_put({client_id_, id, nullptr}); output_updates_queue_->writer_put({client_id_, 0, std::move(result)}); } else { output_responses_queue_->writer_put({client_id_, id, std::move(result)}); output_updates_queue_->writer_put({client_id_, id, nullptr}); } } void on_error(uint64 id, td_api::object_ptr error) override { if (id == 0) { output_responses_queue_->writer_put({client_id_, id, nullptr}); output_updates_queue_->writer_put({client_id_, 0, std::move(error)}); } else { output_responses_queue_->writer_put({client_id_, id, std::move(error)}); output_updates_queue_->writer_put({client_id_, id, nullptr}); } } Callback(const Callback &) = delete; Callback &operator=(const Callback &) = delete; Callback(Callback &&) = delete; Callback &operator=(Callback &&) = delete; ~Callback() override { output_updates_queue_->writer_put({client_id_, 0, nullptr}); output_responses_queue_->writer_put({client_id_, 0, nullptr}); } private: MultiClient::ClientId client_id_; std::shared_ptr output_responses_queue_; std::shared_ptr output_updates_queue_; }; return td::make_unique(client_id, output_responses_queue_, output_updates_queue_); } private: using OutputQueue = MpscPollableQueue; std::shared_ptr output_responses_queue_; std::shared_ptr output_updates_queue_; int output_responses_queue_ready_cnt_{0}; int output_updates_queue_ready_cnt_{0}; std::atomic receive_responses_lock_{false}; std::atomic receive_updates_lock_{false}; MultiClient::Response receive_unlocked(double timeout, bool include_responses, bool include_updates) { if (include_responses) { if (output_responses_queue_ready_cnt_ == 0) { output_responses_queue_ready_cnt_ = output_responses_queue_->reader_wait_nonblock(); } if (output_responses_queue_ready_cnt_ > 0) { output_responses_queue_ready_cnt_--; return output_responses_queue_->reader_get_unsafe(); } } if (include_updates) { if (output_updates_queue_ready_cnt_ == 0) { output_updates_queue_ready_cnt_ = output_updates_queue_->reader_wait_nonblock(); } if (output_updates_queue_ready_cnt_ > 0) { output_updates_queue_ready_cnt_--; return output_updates_queue_->reader_get_unsafe(); } } if (timeout != 0) { if (include_responses && !include_updates) { output_responses_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); } else if (!include_responses && include_updates) { output_updates_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); } else if (include_responses && include_updates) { output_updates_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); } else { // This configuration (include_responses = false and include_updates = false) shouldn't be used. output_updates_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); } return receive_unlocked(0, include_responses, include_updates); } return {0, 0, nullptr}; } }; class MultiImpl { public: explicit MultiImpl(std::shared_ptr net_query_stats) { concurrent_scheduler_ = std::make_shared(); concurrent_scheduler_->init(3); concurrent_scheduler_->start(); { auto guard = concurrent_scheduler_->get_main_guard(); Td::Options options; options.net_query_stats = std::move(net_query_stats); multi_td_ = create_actor("MultiTd", std::move(options)); } scheduler_thread_ = thread([concurrent_scheduler = concurrent_scheduler_] { while (concurrent_scheduler->run_main(10)) { } }); } MultiImpl(const MultiImpl &) = delete; MultiImpl &operator=(const MultiImpl &) = delete; MultiImpl(MultiImpl &&) = delete; MultiImpl &operator=(MultiImpl &&) = delete; int32 create(TdReceiver &receiver) { auto id = create_id(); create(id, receiver.create_callback(id)); return id; } void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) { auto guard = concurrent_scheduler_->get_send_guard(); send_closure(multi_td_, &MultiTd::send, client_id, request_id, std::move(function)); } void close(int32 td_id) { auto guard = concurrent_scheduler_->get_send_guard(); send_closure(multi_td_, &MultiTd::close, td_id); } ~MultiImpl() { { auto guard = concurrent_scheduler_->get_send_guard(); multi_td_.reset(); Scheduler::instance()->finish(); } scheduler_thread_.join(); concurrent_scheduler_->finish(); } private: std::shared_ptr concurrent_scheduler_; thread scheduler_thread_; ActorOwn multi_td_; static int32 create_id() { static std::atomic current_id{1}; return current_id.fetch_add(1); } void create(int32 td_id, unique_ptr callback) { auto guard = concurrent_scheduler_->get_send_guard(); send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback)); } }; class MultiImplPool { public: std::shared_ptr get() { std::unique_lock lock(mutex_); if (impls_.empty()) { init_openssl_threads(); impls_.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4); } auto &impl = *std::min_element(impls_.begin(), impls_.end(), [](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); }); auto res = impl.lock(); if (!res) { res = std::make_shared(net_query_stats_); impl = res; } return res; } private: std::mutex mutex_; std::vector> impls_; std::shared_ptr net_query_stats_ = std::make_shared(); }; class MultiClient::Impl final { public: ClientId create_client() { auto impl = pool_.get(); auto client_id = impl->create(*receiver_); { auto lock = impls_mutex_.lock_write().move_as_ok(); impls_[client_id] = std::move(impl); } return client_id; } void send(ClientId client_id, RequestId request_id, Function function) { auto lock = impls_mutex_.lock_read().move_as_ok(); auto it = impls_.find(client_id); CHECK(it != impls_.end()); it->second->send(client_id, request_id, std::move(function)); } Response receive(double timeout) { return receive(timeout, true, true); } Response receive(double timeout, bool include_responses, bool include_updates) { auto res = receiver_->receive(timeout, include_responses, include_updates); if (res.client_id != 0 && !res.object) { auto lock = impls_mutex_.lock_write().move_as_ok(); impls_.erase(res.client_id); } return res; } Impl() = default; Impl(const Impl &) = delete; Impl &operator=(const Impl &) = delete; Impl(Impl &&) = delete; Impl &operator=(Impl &&) = delete; ~Impl() { for (auto &it : impls_) { it.second->close(it.first); } while (!impls_.empty()) { receive(10, true, true); } } private: MultiImplPool pool_; RwMutex impls_mutex_; std::unordered_map> impls_; unique_ptr receiver_{make_unique()}; }; class Client::Impl final { public: Impl() { static MultiImplPool pool; multi_impl_ = pool.get(); receiver_ = make_unique(); td_id_ = multi_impl_->create(*receiver_); } void send(Client::Request request) { if (request.id == 0 || request.function == nullptr) { LOG(ERROR) << "Drop wrong request " << request.id; return; } multi_impl_->send(td_id_, request.id, std::move(request.function)); } Client::Response receive(double timeout) { return receive(timeout, true, true); } Client::Response receive(double timeout, bool include_responses, bool include_updates) { auto res = receiver_->receive(timeout, include_responses, include_updates); if (res.client_id != 0 && !res.object) { is_closed_ = true; } Client::Response old_res; old_res.id = res.id; old_res.object = std::move(res.object); return old_res; } Impl(const Impl &) = delete; Impl &operator=(const Impl &) = delete; Impl(Impl &&) = delete; Impl &operator=(Impl &&) = delete; ~Impl() { multi_impl_->close(td_id_); while (!is_closed_) { receive(10, true, true); } } private: std::shared_ptr multi_impl_; unique_ptr receiver_; bool is_closed_{false}; int32 td_id_; }; #endif Client::Client() : impl_(std::make_unique()) { } void Client::send(Request &&request) { impl_->send(std::move(request)); } Client::Response Client::receive(double timeout) { return impl_->receive(timeout); } Client::Response Client::receive(double timeout, bool include_responses, bool include_updates) { return impl_->receive(timeout, include_responses, include_updates); } Client::Response Client::execute(Request &&request) { Response response; response.id = request.id; response.object = Td::static_request(std::move(request.function)); return response; } Client::~Client() = default; Client::Client(Client &&other) = default; Client &Client::operator=(Client &&other) = default; MultiClient::MultiClient() : impl_(std::make_unique()) { } MultiClient::ClientId MultiClient::create_client() { return impl_->create_client(); } void MultiClient::send(ClientId client_id, RequestId request_id, Function &&function) { impl_->send(client_id, request_id, std::move(function)); } MultiClient::Response MultiClient::receive(double timeout) { return impl_->receive(timeout); } MultiClient::Response MultiClient::receive(double timeout, bool include_responses, bool include_updates) { return impl_->receive(timeout, include_responses, include_updates); } MultiClient::Object MultiClient::execute(Function &&function) { return Td::static_request(std::move(function)); } MultiClient::~MultiClient() = default; MultiClient::MultiClient(MultiClient &&other) = default; MultiClient &MultiClient::operator=(MultiClient &&other) = default; } // namespace td