diff --git a/td/telegram/Client.cpp b/td/telegram/Client.cpp index 78094090a..98d5c4062 100644 --- a/td/telegram/Client.cpp +++ b/td/telegram/Client.cpp @@ -108,26 +108,28 @@ class Client::Impl final { #else -/*** TdProxy ***/ -using InputQueue = MpscPollableQueue; using OutputQueue = MpscPollableQueue; class TdProxy : public Actor { public: - TdProxy(std::shared_ptr input_queue, std::shared_ptr output_queue) - : input_queue_(std::move(input_queue)), output_queue_(std::move(output_queue)) { + explicit TdProxy(std::shared_ptr output_queue) : output_queue_(std::move(output_queue)) { + } + + void request(uint64 id, tl_object_ptr function) { + if (id == 0 && function == nullptr) { + was_hangup_ = true; + td_.reset(); + return try_stop(); + } + send_closure(td_, &Td::request, id, std::move(function)); } private: - std::shared_ptr input_queue_; std::shared_ptr output_queue_; bool is_td_closed_ = false; bool was_hangup_ = false; ActorOwn td_; void start_up() override { - auto &fd = input_queue_->reader_get_event_fd(); - Scheduler::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); - class Callback : public TdCallback { public: Callback(ActorId parent, std::shared_ptr output_queue) @@ -163,33 +165,6 @@ class TdProxy : public Actor { Scheduler::instance()->finish(); stop(); } - - void loop() override { - while (true) { - int size = input_queue_->reader_wait_nonblock(); - if (size == 0) { - return; - } - for (int i = 0; i < size; i++) { - auto request = input_queue_->reader_get_unsafe(); - if (request.id == 0 && request.function == nullptr) { - was_hangup_ = true; - td_.reset(); - return try_stop(); - } - send_closure_later(td_, &Td::request, request.id, std::move(request.function)); - } - } - } - - void hangup() override { - UNREACHABLE(); - } - - void tear_down() override { - auto &fd = input_queue_->reader_get_event_fd(); - Scheduler::unsubscribe(fd.get_poll_info().get_pollable_fd_ref()); - } }; /*** Client::Impl ***/ @@ -205,7 +180,8 @@ class Client::Impl final { return; } - input_queue_->writer_put(std::move(request)); + auto guard = scheduler_->get_send_guard(); + send_closure(td_proxy_, &TdProxy::request, request.id, std::move(request.function)); } Response receive(double timeout) { @@ -222,26 +198,25 @@ class Client::Impl final { Impl(Impl &&) = delete; Impl &operator=(Impl &&) = delete; ~Impl() { - input_queue_->writer_put({0, nullptr}); + auto guard = scheduler_->get_send_guard(); + send_closure(td_proxy_, &TdProxy::request, 0, nullptr); scheduler_thread_.join(); } private: - std::shared_ptr input_queue_; std::shared_ptr output_queue_; std::shared_ptr scheduler_; int output_queue_ready_cnt_{0}; thread scheduler_thread_; std::atomic receive_lock_{false}; + ActorId td_proxy_; void init() { - input_queue_ = std::make_shared(); - input_queue_->init(); output_queue_ = std::make_shared(); output_queue_->init(); scheduler_ = std::make_shared(); scheduler_->init(3); - scheduler_->create_actor_unsafe(0, "TdProxy", input_queue_, output_queue_).release(); + td_proxy_ = scheduler_->create_actor_unsafe(0, "TdProxy", output_queue_).release(); scheduler_->start(); scheduler_thread_ = thread([scheduler = scheduler_] {