// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/telegram/Client.h" #include "td/telegram/Td.h" #include "td/actor/actor.h" #include "td/utils/crypto.h" #include "td/utils/logging.h" #include "td/utils/MpscPollableQueue.h" #include "td/utils/Observer.h" #include "td/utils/port/Fd.h" #include "td/utils/port/Poll.h" #include "td/utils/port/thread.h" #include namespace td { #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED class Client::Impl final { public: Impl() { init(); } void send(Request request) { if (request.id == 0 || request.function == nullptr) { LOG(ERROR) << "Drop wrong request " << request.id; return; } requests_.push_back(std::move(request)); } Response receive(double timeout) { if (!requests_.empty()) { auto guard = scheduler_->get_current_guard(); for (auto &request : requests_) { send_closure_later(td_, &Td::request, request.id, std::move(request.function)); } requests_.clear(); } if (responses_.empty()) { scheduler_->run_main(0); } if (!responses_.empty()) { auto result = std::move(responses_.front()); responses_.pop_front(); return result; } return {0, nullptr}; } Impl(const Impl &) = delete; Impl &operator=(const Impl &) = delete; Impl(Impl &&) = delete; Impl &operator=(Impl &&) = delete; ~Impl() { { auto guard = scheduler_->get_current_guard(); td_.reset(); } while (!closed_) { scheduler_->run_main(0); } scheduler_.reset(); } private: std::deque responses_; std::vector requests_; int output_queue_ready_cnt_{0}; std::unique_ptr scheduler_; ActorOwn td_; bool closed_ = false; void init() { scheduler_ = std::make_unique(); scheduler_->init(0); class Callback : public TdCallback { public: Callback(Impl *client) : client_(client) { } void on_result(std::uint64_t id, td_api::object_ptr result) override { client_->responses_.push_back({id, std::move(result)}); } void on_error(std::uint64_t id, td_api::object_ptr error) override { client_->responses_.push_back({id, std::move(error)}); } void on_closed() override { client_->closed_ = true; Scheduler::instance()->yield(); } private: Impl *client_; }; td_ = scheduler_->create_actor_unsafe(0, "Td", make_unique(this)); scheduler_->start(); } }; #else /*** TdProxy ***/ using InputQueue = MpscPollableQueue; using OutputQueue = MpscPollableQueue; class TdProxy : public Actor { public: TdProxy(std::shared_ptr input_queue, std::shared_ptr output_queue) : input_queue_(std::move(input_queue)), output_queue_(std::move(output_queue)) { } private: std::shared_ptr input_queue_; std::shared_ptr output_queue_; bool is_td_closed_ = false; bool was_hangup_ = false; ActorOwn td_; void start_up() override { auto &fd = input_queue_->reader_get_event_fd(); fd.get_fd().set_observer(this); ::td::subscribe(fd.get_fd(), Fd::Read); class Callback : public TdCallback { public: Callback(ActorId parent, std::shared_ptr output_queue) : parent_(parent), output_queue_(std::move(output_queue)) { } void on_result(std::uint64_t id, td_api::object_ptr result) override { output_queue_->writer_put({id, std::move(result)}); } void on_error(std::uint64_t id, td_api::object_ptr error) override { output_queue_->writer_put({id, std::move(error)}); } void on_closed() override { send_closure(parent_, &TdProxy::on_closed); } private: ActorId parent_; std::shared_ptr output_queue_; }; td_ = create_actor("Td", make_unique(actor_id(this), std::move(output_queue_))); yield(); } void on_closed() { is_td_closed_ = true; try_stop(); } void try_stop() { if (!is_td_closed_ || !was_hangup_) { return; } Scheduler::instance()->finish(); stop(); } void loop() override { while (true) { int size = input_queue_->reader_wait_nonblock(); if (size == 0) { return; } for (int i = 0; i < size; i++) { auto request = input_queue_->reader_get_unsafe(); if (request.id == 0 && request.function == nullptr) { was_hangup_ = true; td_.reset(); return try_stop(); } send_closure_later(td_, &Td::request, request.id, std::move(request.function)); } } } void hangup() override { UNREACHABLE(); } void tear_down() override { auto &fd = input_queue_->reader_get_event_fd(); ::td::unsubscribe(fd.get_fd()); fd.get_fd().set_observer(nullptr); } }; /*** Client::Impl ***/ class Client::Impl final : ObserverBase { public: Impl() { init(); } void send(Request request) { if (request.id == 0 || request.function == nullptr) { LOG(ERROR) << "Drop wrong request " << request.id; return; } input_queue_->writer_put(std::move(request)); } Response receive(double timeout) { if (output_queue_ready_cnt_ == 0) { output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock(); } if (output_queue_ready_cnt_ > 0) { output_queue_ready_cnt_--; return output_queue_->reader_get_unsafe(); } if (timeout != 0) { poll_.run(static_cast(timeout * 1000)); return receive(0); } return {0, nullptr}; } Impl(const Impl &) = delete; Impl &operator=(const Impl &) = delete; Impl(Impl &&) = delete; Impl &operator=(Impl &&) = delete; ~Impl() { input_queue_->writer_put({0, nullptr}); scheduler_thread_.join(); } private: Poll poll_; std::shared_ptr input_queue_; std::shared_ptr output_queue_; std::shared_ptr scheduler_; int output_queue_ready_cnt_{0}; thread scheduler_thread_; bool notify_flag_{false}; void init() { input_queue_ = std::make_shared(); input_queue_->init(); output_queue_ = std::make_shared(); output_queue_->init(); scheduler_ = std::make_shared(); scheduler_->init(3); scheduler_->create_actor_unsafe(0, "TdProxy", input_queue_, output_queue_).release(); scheduler_->start(); scheduler_thread_ = thread([scheduler = scheduler_] { while (scheduler->run_main(10)) { } scheduler->finish(); }); poll_.init(); auto &event_fd = output_queue_->reader_get_event_fd(); event_fd.get_fd().set_observer(this); poll_.subscribe(event_fd.get_fd(), Fd::Read); } void notify() override { notify_flag_ = true; } }; #endif /*** Client ***/ Client::Client() : impl_(make_unique()) { // At least it should be enough for everybody who uses TDLib init_openssl_threads(); } void Client::send(Request &&request) { impl_->send(std::move(request)); } Client::Response Client::receive(double timeout) { return impl_->receive(timeout); } Client::Response Client::execute(Request &&request) { Response response; response.id = request.id; response.object = Td::static_request(std::move(request.function)); return response; } Client::~Client() = default; Client::Client(Client &&other) = default; Client &Client::operator=(Client &&other) = default; } // namespace td