Fix responses queue

This commit is contained in:
Andrea Cavalli 2020-08-31 19:19:04 +02:00
parent 6e10e6f012
commit ffea9135e8

View File

@ -88,10 +88,18 @@ class TdReceiver {
Callback(MultiClient::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) { Callback(MultiClient::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) {
} }
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override { void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
impl_->responses_.push({client_id_, id, std::move(result)}); if (id == 0) {
impl_->updates_.push({client_id_, 0, std::move(result)});
} else {
impl_->responses_.push({client_id_, id, std::move(result)});
}
} }
void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override { void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
impl_->responses_.push({client_id_, id, std::move(error)}); if (id == 0) {
impl_->updates_.push({client_id_, 0, std::move(error)});
} else {
impl_->responses_.push({client_id_, id, std::move(error)});
}
} }
Callback(const Callback &) = delete; Callback(const Callback &) = delete;
Callback &operator=(const Callback &) = delete; Callback &operator=(const Callback &) = delete;
@ -230,10 +238,10 @@ class Client::Impl final {
class TdReceiver { class TdReceiver {
public: public:
TdReceiver() { TdReceiver() {
output_updates_queue_ = std::make_shared<OutputQueue>();
output_updates_queue_->init();
output_responses_queue_ = std::make_shared<OutputQueue>(); output_responses_queue_ = std::make_shared<OutputQueue>();
output_responses_queue_->init(); output_responses_queue_->init();
output_updates_queue_ = std::make_shared<OutputQueue>();
output_updates_queue_->init();
} }
MultiClient::Response receive(double timeout) { MultiClient::Response receive(double timeout) {
@ -254,14 +262,22 @@ class TdReceiver {
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) { unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) {
class Callback : public TdCallback { class Callback : public TdCallback {
public: public:
explicit Callback(MultiClient::ClientId client_id, std::shared_ptr<OutputQueue> output_updates_queue, std::shared_ptr<OutputQueue> output_responses_queue) explicit Callback(MultiClient::ClientId client_id, std::shared_ptr<OutputQueue> output_responses_queue, std::shared_ptr<OutputQueue> output_updates_queue)
: client_id_(client_id), output_updates_queue_(std::move(output_updates_queue)), output_responses_queue_(std::move(output_responses_queue)) { : client_id_(client_id), output_responses_queue_(std::move(output_responses_queue), output_updates_queue_(std::move(output_updates_queue))) {
} }
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override { void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
output_responses_queue_->writer_put({client_id_, id, std::move(result)}); if (id == 0) {
output_updates_queue_->writer_put({client_id_, 0, std::move(result)});
} else {
output_responses_queue_->writer_put({client_id_, id, std::move(result)});
}
} }
void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override { void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
output_responses_queue_->writer_put({client_id_, id, std::move(error)}); if (id == 0) {
output_updates_queue_->writer_put({client_id_, 0, std::move(error)});
} else {
output_responses_queue_->writer_put({client_id_, id, std::move(error)});
}
} }
Callback(const Callback &) = delete; Callback(const Callback &) = delete;
Callback &operator=(const Callback &) = delete; Callback &operator=(const Callback &) = delete;
@ -273,18 +289,18 @@ class TdReceiver {
private: private:
MultiClient::ClientId client_id_; MultiClient::ClientId client_id_;
std::shared_ptr<OutputQueue> output_updates_queue_;
std::shared_ptr<OutputQueue> output_responses_queue_; std::shared_ptr<OutputQueue> output_responses_queue_;
std::shared_ptr<OutputQueue> output_updates_queue_;
}; };
return td::make_unique<Callback>(client_id, output_updates_queue_, output_responses_queue_); return td::make_unique<Callback>(client_id, output_responses_queue_, output_updates_queue_);
} }
private: private:
using OutputQueue = MpscPollableQueue<MultiClient::Response>; using OutputQueue = MpscPollableQueue<MultiClient::Response>;
std::shared_ptr<OutputQueue> output_updates_queue_;
std::shared_ptr<OutputQueue> output_responses_queue_; std::shared_ptr<OutputQueue> output_responses_queue_;
int output_updates_queue_ready_cnt_{0}; std::shared_ptr<OutputQueue> output_updates_queue_;
int output_responses_queue_ready_cnt_{0}; int output_responses_queue_ready_cnt_{0};
int output_updates_queue_ready_cnt_{0};
std::atomic<bool> receive_lock_{false}; std::atomic<bool> receive_lock_{false};
MultiClient::Response receive_unlocked(double timeout, bool include_responses, bool include_updates) { MultiClient::Response receive_unlocked(double timeout, bool include_responses, bool include_updates) {