From ffea9135e89753d5b7a8bb8f8f56efbb6daeb08e Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 31 Aug 2020 19:19:04 +0200 Subject: [PATCH] Fix responses queue --- td/telegram/Client.cpp | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/td/telegram/Client.cpp b/td/telegram/Client.cpp index c7e023f27..5d492a6cb 100644 --- a/td/telegram/Client.cpp +++ b/td/telegram/Client.cpp @@ -88,10 +88,18 @@ class TdReceiver { Callback(MultiClient::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) { } void on_result(uint64 id, td_api::object_ptr result) override { - impl_->responses_.push({client_id_, id, std::move(result)}); + if (id == 0) { + impl_->updates_.push({client_id_, 0, std::move(result)}); + } else { + impl_->responses_.push({client_id_, id, std::move(result)}); + } } void on_error(uint64 id, td_api::object_ptr error) override { - impl_->responses_.push({client_id_, id, std::move(error)}); + if (id == 0) { + impl_->updates_.push({client_id_, 0, std::move(error)}); + } else { + impl_->responses_.push({client_id_, id, std::move(error)}); + } } Callback(const Callback &) = delete; Callback &operator=(const Callback &) = delete; @@ -230,10 +238,10 @@ class Client::Impl final { class TdReceiver { public: TdReceiver() { - output_updates_queue_ = std::make_shared(); - output_updates_queue_->init(); output_responses_queue_ = std::make_shared(); output_responses_queue_->init(); + output_updates_queue_ = std::make_shared(); + output_updates_queue_->init(); } MultiClient::Response receive(double timeout) { @@ -254,14 +262,22 @@ class TdReceiver { unique_ptr create_callback(MultiClient::ClientId client_id) { class Callback : public TdCallback { public: - explicit Callback(MultiClient::ClientId client_id, std::shared_ptr output_updates_queue, std::shared_ptr output_responses_queue) - : client_id_(client_id), output_updates_queue_(std::move(output_updates_queue)), output_responses_queue_(std::move(output_responses_queue)) { + explicit Callback(MultiClient::ClientId client_id, std::shared_ptr output_responses_queue, std::shared_ptr output_updates_queue) + : client_id_(client_id), output_responses_queue_(std::move(output_responses_queue), output_updates_queue_(std::move(output_updates_queue))) { } void on_result(uint64 id, td_api::object_ptr result) override { - output_responses_queue_->writer_put({client_id_, id, std::move(result)}); + if (id == 0) { + output_updates_queue_->writer_put({client_id_, 0, std::move(result)}); + } else { + output_responses_queue_->writer_put({client_id_, id, std::move(result)}); + } } void on_error(uint64 id, td_api::object_ptr error) override { - output_responses_queue_->writer_put({client_id_, id, std::move(error)}); + if (id == 0) { + output_updates_queue_->writer_put({client_id_, 0, std::move(error)}); + } else { + output_responses_queue_->writer_put({client_id_, id, std::move(error)}); + } } Callback(const Callback &) = delete; Callback &operator=(const Callback &) = delete; @@ -273,18 +289,18 @@ class TdReceiver { private: MultiClient::ClientId client_id_; - std::shared_ptr output_updates_queue_; std::shared_ptr output_responses_queue_; + std::shared_ptr output_updates_queue_; }; - return td::make_unique(client_id, output_updates_queue_, output_responses_queue_); + return td::make_unique(client_id, output_responses_queue_, output_updates_queue_); } private: using OutputQueue = MpscPollableQueue; - std::shared_ptr output_updates_queue_; std::shared_ptr output_responses_queue_; - int output_updates_queue_ready_cnt_{0}; + std::shared_ptr output_updates_queue_; int output_responses_queue_ready_cnt_{0}; + int output_updates_queue_ready_cnt_{0}; std::atomic receive_lock_{false}; MultiClient::Response receive_unlocked(double timeout, bool include_responses, bool include_updates) {