diff --git a/td/telegram/Client.cpp b/td/telegram/Client.cpp index 474bb8868..7037582ef 100644 --- a/td/telegram/Client.cpp +++ b/td/telegram/Client.cpp @@ -20,8 +20,6 @@ #include "td/utils/port/RwMutex.h" #include "td/utils/port/thread.h" -#include "td/utils/death_handler.h" - #include #include #include @@ -35,20 +33,11 @@ namespace td { class TdReceiver { public: ClientManager::Response receive(double timeout) { - return receive(timeout, true, true); - } - - ClientManager::Response receive(double timeout, bool include_responses, bool include_updates) { - if (include_responses && !responses_.empty()) { + if (!responses_.empty()) { auto result = std::move(responses_.front()); responses_.pop(); return result; } - if (include_updates && !updates_.empty()) { - auto result = std::move(updates_.front()); - updates_.pop(); - return result; - } return {0, 0, nullptr}; } @@ -58,30 +47,17 @@ class TdReceiver { Callback(ClientManager::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) { } void on_result(uint64 id, td_api::object_ptr result) override { - if (id == 0) { - impl_->responses_.push({client_id_, id, nullptr}); - impl_->updates_.push({client_id_, 0, std::move(result)}); - } else { - impl_->responses_.push({client_id_, id, std::move(result)}); - impl_->updates_.push({client_id_, id, nullptr}); - } + impl_->responses_.push({client_id_, id, std::move(result)}); } void on_error(uint64 id, td_api::object_ptr error) override { - if (id == 0) { - impl_->responses_.push({client_id_, 0, nullptr}); - impl_->updates_.push({client_id_, 0, std::move(error)}); - } else { - impl_->responses_.push({client_id_, id, std::move(error)}); - impl_->updates_.push({client_id_, id, nullptr}); - } + impl_->responses_.push({client_id_, id, std::move(error)}); } Callback(const Callback &) = delete; Callback &operator=(const Callback &) = delete; Callback(Callback &&) = delete; Callback &operator=(Callback &&) = delete; ~Callback() override { - //impl_->responses_.push({0, 0, nullptr}); - impl_->updates_.push({client_id_, 0, nullptr}); + impl_->responses_.push({client_id_, 0, nullptr}); } private: @@ -96,7 +72,6 @@ class TdReceiver { } private: - std::queue updates_; std::queue responses_; }; @@ -122,10 +97,6 @@ class ClientManager::Impl final { } Response receive(double timeout) { - return receive(timeout, true, true); - } - - Response receive(double timeout, bool include_responses, bool include_updates) { if (!requests_.empty()) { for (size_t i = 0; i < requests_.size(); i++) { auto &request = requests_[i]; @@ -148,10 +119,10 @@ class ClientManager::Impl final { requests_.clear(); } - auto response = receiver_.receive(0, include_responses, include_updates); + auto response = receiver_.receive(0); if (response.client_id == 0 && concurrent_scheduler_ != nullptr) { concurrent_scheduler_->run_main(0); - response = receiver_.receive(0, include_responses, include_updates); + response = receiver_.receive(0); } else { ConcurrentScheduler::emscripten_clear_main_timeout(); } @@ -205,7 +176,7 @@ class ClientManager::Impl final { } } while (!tds_.empty() && !ExitGuard::is_exited()) { - receive(0.1, false, true); + receive(0.1); } if (!ExitGuard::is_exited()) { // prevent closing of schedulers from already killed by OS threads concurrent_scheduler_->finish(); @@ -236,11 +207,8 @@ class Client::Impl final { } Response receive(double timeout) { - return receive(timeout, true, true); - } + auto response = impl_.receive(timeout); - Response receive(double timeout, bool include_responses, bool include_updates) { - auto response = impl_.receive(timeout, include_responses, include_updates); Response old_response; old_response.id = response.request_id; old_response.object = std::move(response.object); @@ -291,41 +259,19 @@ class MultiTd : public Actor { class TdReceiver { public: TdReceiver() { - output_responses_queue_ = std::make_shared(); - output_responses_queue_->init(); - output_updates_queue_ = std::make_shared(); - output_updates_queue_->init(); + output_queue_ = std::make_shared(); + output_queue_->init(); } ClientManager::Response receive(double timeout) { - return receive(timeout, true, true); - } - - ClientManager::Response receive(double timeout, bool include_responses, bool include_updates) { VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout; - bool is_responses_locked = false; - bool is_updates_locked = false; - if (include_responses) { - is_responses_locked = receive_responses_lock_.exchange(true); - if (is_responses_locked) { - LOG(FATAL) << "Receive is called after Client destroy, or simultaneously from different threads"; - } - } - if (include_updates) { - is_updates_locked = receive_updates_lock_.exchange(true); - if (is_updates_locked) { - LOG(FATAL) << "Receive is called after Client destroy, or simultaneously from different threads"; - } - } - auto response = receive_unlocked(timeout, include_responses, include_updates); - if (include_updates) { - is_updates_locked = receive_updates_lock_.exchange(false); - CHECK(is_updates_locked); - } - if (include_responses) { - is_responses_locked = receive_responses_lock_.exchange(false); - CHECK(is_responses_locked); + auto is_locked = receive_lock_.exchange(true); + if (is_locked) { + LOG(FATAL) << "Receive is called after Client destroy, or simultaneously from different threads"; } + auto response = receive_unlocked(timeout); + is_locked = receive_lock_.exchange(false); + CHECK(is_locked); VLOG(td_requests) << "End to wait for updates, returning object " << response.request_id << ' ' << response.object.get(); return response; @@ -334,93 +280,51 @@ class TdReceiver { unique_ptr create_callback(ClientManager::ClientId client_id) { class Callback : public TdCallback { public: - explicit Callback(ClientManager::ClientId client_id, std::shared_ptr output_responses_queue, std::shared_ptr output_updates_queue) - : client_id_(client_id), output_responses_queue_(std::move(output_responses_queue)), output_updates_queue_(std::move(output_updates_queue)) { + explicit Callback(ClientManager::ClientId client_id, std::shared_ptr output_queue) + : client_id_(client_id), output_queue_(std::move(output_queue)) { } void on_result(uint64 id, td_api::object_ptr result) override { - if (id == 0) { - output_responses_queue_->writer_put({0, 0, nullptr}); - output_updates_queue_->writer_put({client_id_, 0, std::move(result)}); - } else { - output_responses_queue_->writer_put({client_id_, id, std::move(result)}); - output_updates_queue_->writer_put({0, 0, nullptr}); - } + output_queue_->writer_put({client_id_, id, std::move(result)}); } void on_error(uint64 id, td_api::object_ptr error) override { - if (id == 0) { - output_responses_queue_->writer_put({0, 0, nullptr}); - output_updates_queue_->writer_put({client_id_, 0, std::move(error)}); - } else { - output_responses_queue_->writer_put({client_id_, id, std::move(error)}); - output_updates_queue_->writer_put({0, 0, nullptr}); - } + output_queue_->writer_put({client_id_, id, std::move(error)}); } Callback(const Callback &) = delete; Callback &operator=(const Callback &) = delete; Callback(Callback &&) = delete; Callback &operator=(Callback &&) = delete; ~Callback() override { - //output_responses_queue_->writer_put({0, 0, nullptr}); - output_updates_queue_->writer_put({client_id_, 0, nullptr}); + output_queue_->writer_put({client_id_, 0, nullptr}); } private: ClientManager::ClientId client_id_; - std::shared_ptr output_responses_queue_; - std::shared_ptr output_updates_queue_; + std::shared_ptr output_queue_; }; - return td::make_unique(client_id, output_responses_queue_, output_updates_queue_); + return td::make_unique(client_id, output_queue_); } void add_response(ClientManager::ClientId client_id, uint64 id, td_api::object_ptr result) { - if (id == 0) { - output_responses_queue_->writer_put({0, 0, nullptr}); - output_updates_queue_->writer_put({client_id, id, std::move(result)}); - } else { - output_responses_queue_->writer_put({client_id, id, std::move(result)}); - output_updates_queue_->writer_put({0, 0, nullptr}); - } + output_queue_->writer_put({client_id, id, std::move(result)}); } private: using OutputQueue = MpscPollableQueue; - std::shared_ptr output_responses_queue_; - std::shared_ptr output_updates_queue_; - int output_responses_queue_ready_cnt_{0}; - int output_updates_queue_ready_cnt_{0}; - std::atomic receive_responses_lock_{false}; - std::atomic receive_updates_lock_{false}; + std::shared_ptr output_queue_; + int output_queue_ready_cnt_{0}; + std::atomic receive_lock_{false}; - ClientManager::Response receive_unlocked(double timeout, bool include_responses, bool include_updates) { - if (include_responses) { - if (output_responses_queue_ready_cnt_ == 0) { - output_responses_queue_ready_cnt_ = output_responses_queue_->reader_wait_nonblock(); - } - if (output_responses_queue_ready_cnt_ > 0) { - output_responses_queue_ready_cnt_--; - return output_responses_queue_->reader_get_unsafe(); - } + ClientManager::Response receive_unlocked(double timeout) { + if (output_queue_ready_cnt_ == 0) { + output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock(); } - if (include_updates) { - if (output_updates_queue_ready_cnt_ == 0) { - output_updates_queue_ready_cnt_ = output_updates_queue_->reader_wait_nonblock(); - } - if (output_updates_queue_ready_cnt_ > 0) { - output_updates_queue_ready_cnt_--; - return output_updates_queue_->reader_get_unsafe(); - } + if (output_queue_ready_cnt_ > 0) { + output_queue_ready_cnt_--; + return output_queue_->reader_get_unsafe(); } if (timeout != 0) { - if (include_responses && !include_updates) { - output_responses_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); - } else if (!include_responses && include_updates) { - output_updates_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); - } else if (include_responses && include_updates) { - output_updates_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); - } else { - // This configuration (include_responses = false and include_updates = false) shouldn't be used. - } - return receive_unlocked(0, include_responses, include_updates); + output_queue_->reader_get_event_fd().wait(static_cast(timeout * 1000)); + return receive_unlocked(0); } return {0, 0, nullptr}; } @@ -575,15 +479,11 @@ class ClientManager::Impl final { } Response receive(double timeout) { - return receive(timeout, true, true); - } - - Response receive(double timeout, bool include_responses, bool include_updates) { - auto response = receiver_.receive(timeout, include_responses, include_updates); + auto response = receiver_.receive(timeout); if (response.request_id == 0 && response.object != nullptr && response.object->get_id() == td_api::updateAuthorizationState::ID && static_cast(response.object.get())->authorization_state_->get_id() == - td_api::authorizationStateClosed::ID) { + td_api::authorizationStateClosed::ID) { auto lock = impls_mutex_.lock_write().move_as_ok(); close_impl(response.client_id); @@ -630,7 +530,7 @@ class ClientManager::Impl final { close_impl(it.first); } while (!impls_.empty() && !ExitGuard::is_exited()) { - receive(0.1, false, true); + receive(0.1); } } @@ -663,11 +563,7 @@ class Client::Impl final { } Response receive(double timeout) { - return receive(timeout, true, true); - } - - Response receive(double timeout, bool include_responses, bool include_updates) { - auto response = receiver_.receive(timeout, include_responses, include_updates); + auto response = receiver_.receive(timeout); Response old_response; old_response.id = response.request_id; @@ -682,7 +578,7 @@ class Client::Impl final { ~Impl() { multi_impl_->close(td_id_); while (!ExitGuard::is_exited()) { - auto response = receiver_.receive(0.1, false, true); + auto response = receiver_.receive(0.1); if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) { break; } @@ -708,10 +604,6 @@ Client::Response Client::receive(double timeout) { return impl_->receive(timeout); } -Client::Response Client::receive(double timeout, bool include_responses, bool include_updates) { - return impl_->receive(timeout, include_responses, include_updates); -} - Client::Response Client::execute(Request &&request) { Response response; response.id = request.id; @@ -738,10 +630,6 @@ ClientManager::Response ClientManager::receive(double timeout) { return impl_->receive(timeout); } -ClientManager::Response ClientManager::receive(double timeout, bool include_responses, bool include_updates) { - return impl_->receive(timeout, include_responses, include_updates); -} - td_api::object_ptr ClientManager::execute(td_api::object_ptr &&request) { return Td::static_request(std::move(request)); } diff --git a/td/telegram/Client.h b/td/telegram/Client.h index 34eecfb79..7cb386def 100644 --- a/td/telegram/Client.h +++ b/td/telegram/Client.h @@ -103,17 +103,6 @@ class Client final { */ Response receive(double timeout); - /** - * Receives incoming updates and request responses from TDLib. May be called from any thread, but shouldn't be - * called simultaneously from two different threads. - * \param[in] timeout The maximum number of seconds allowed for this function to wait for new data. - * \param[in] include_responses Include request responses from TDLib. - * \param[in] include_responses Include updates from TDLib. - * \return An incoming update or request response. The object returned in the response may be a nullptr - * if the timeout expires. - */ - Response receive(double timeout, bool include_responses, bool include_updates); - /** * Synchronously executes TDLib requests. Only a few requests can be executed synchronously. * May be called from any thread. @@ -235,8 +224,6 @@ class ClientManager final { */ Response receive(double timeout); - Response receive(double timeout, bool include_responses, bool include_updates); - /** * Synchronously executes TDLib requests. Only a few requests can be executed synchronously. * May be called from any thread.