Store TdReceiver by value.

GitOrigin-RevId: 51dbcaf815c5ba1a42539242b1e57b456f188d38
This commit is contained in:
levlam 2020-10-09 14:25:06 +03:00
parent 35a9a93fc9
commit b491964a81

View File

@ -80,14 +80,13 @@ class ClientManager::Impl final {
options_.net_query_stats = std::make_shared<NetQueryStats>(); options_.net_query_stats = std::make_shared<NetQueryStats>();
concurrent_scheduler_ = make_unique<ConcurrentScheduler>(); concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0); concurrent_scheduler_->init(0);
receiver_ = make_unique<TdReceiver>();
concurrent_scheduler_->start(); concurrent_scheduler_->start();
} }
ClientId create_client() { ClientId create_client() {
auto client_id = ++client_id_; auto client_id = ++client_id_;
tds_[client_id] = tds_[client_id] =
concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_->create_callback(client_id), options_); concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_.create_callback(client_id), options_);
return client_id; return client_id;
} }
@ -101,14 +100,14 @@ class ClientManager::Impl final {
for (size_t i = 0; i < requests_.size(); i++) { for (size_t i = 0; i < requests_.size(); i++) {
auto &request = requests_[i]; auto &request = requests_[i];
if (request.client_id <= 0 || request.client_id > client_id_) { if (request.client_id <= 0 || request.client_id > client_id_) {
receiver_->add_response(request.client_id, request.id, receiver_.add_response(request.client_id, request.id,
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified")); td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
continue; continue;
} }
auto it = tds_.find(request.client_id); auto it = tds_.find(request.client_id);
if (it == tds_.end() || it->second.empty()) { if (it == tds_.end() || it->second.empty()) {
receiver_->add_response(request.client_id, request.id, receiver_.add_response(request.client_id, request.id,
td_api::make_object<td_api::error>(500, "Request aborted")); td_api::make_object<td_api::error>(500, "Request aborted"));
continue; continue;
} }
send_closure_later(it->second, &Td::request, request.id, std::move(request.request)); send_closure_later(it->second, &Td::request, request.id, std::move(request.request));
@ -116,10 +115,10 @@ class ClientManager::Impl final {
requests_.clear(); requests_.clear();
} }
auto response = receiver_->receive(0); auto response = receiver_.receive(0);
if (response.client_id == 0) { if (response.client_id == 0) {
concurrent_scheduler_->run_main(0); concurrent_scheduler_->run_main(0);
response = receiver_->receive(0); response = receiver_.receive(0);
} else { } else {
ConcurrentScheduler::emscripten_clear_main_timeout(); ConcurrentScheduler::emscripten_clear_main_timeout();
} }
@ -161,7 +160,7 @@ class ClientManager::Impl final {
} }
private: private:
unique_ptr<TdReceiver> receiver_; TdReceiver receiver_;
struct Request { struct Request {
ClientId client_id; ClientId client_id;
RequestId id; RequestId id;
@ -410,7 +409,7 @@ class ClientManager::Impl final {
public: public:
ClientId create_client() { ClientId create_client() {
auto impl = pool_.get(); auto impl = pool_.get();
auto client_id = impl->create(*receiver_); auto client_id = impl->create(receiver_);
{ {
auto lock = impls_mutex_.lock_write().move_as_ok(); auto lock = impls_mutex_.lock_write().move_as_ok();
impls_[client_id].impl = std::move(impl); impls_[client_id].impl = std::move(impl);
@ -421,20 +420,20 @@ class ClientManager::Impl final {
void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) { void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) {
auto lock = impls_mutex_.lock_read().move_as_ok(); auto lock = impls_mutex_.lock_read().move_as_ok();
if (!MultiImpl::is_valid_client_id(client_id)) { if (!MultiImpl::is_valid_client_id(client_id)) {
receiver_->add_response(client_id, request_id, receiver_.add_response(client_id, request_id,
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified")); td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
return; return;
} }
auto it = impls_.find(client_id); auto it = impls_.find(client_id);
if (it == impls_.end() || it->second.is_closed) { if (it == impls_.end() || it->second.is_closed) {
receiver_->add_response(client_id, request_id, td_api::make_object<td_api::error>(500, "Request aborted")); receiver_.add_response(client_id, request_id, td_api::make_object<td_api::error>(500, "Request aborted"));
return; return;
} }
it->second.impl->send(client_id, request_id, std::move(request)); it->second.impl->send(client_id, request_id, std::move(request));
} }
Response receive(double timeout) { Response receive(double timeout) {
auto response = receiver_->receive(timeout); auto response = receiver_.receive(timeout);
if (response.request_id == 0 && response.object != nullptr && if (response.request_id == 0 && response.object != nullptr &&
response.object->get_id() == td_api::updateAuthorizationState::ID && response.object->get_id() == td_api::updateAuthorizationState::ID &&
static_cast<const td_api::updateAuthorizationState *>(response.object.get())->authorization_state_->get_id() == static_cast<const td_api::updateAuthorizationState *>(response.object.get())->authorization_state_->get_id() ==
@ -484,7 +483,7 @@ class ClientManager::Impl final {
bool is_closed = false; bool is_closed = false;
}; };
std::unordered_map<ClientId, MultiImplInfo> impls_; std::unordered_map<ClientId, MultiImplInfo> impls_;
unique_ptr<TdReceiver> receiver_{make_unique<TdReceiver>()}; TdReceiver receiver_;
}; };
class Client::Impl final { class Client::Impl final {
@ -492,8 +491,7 @@ class Client::Impl final {
Impl() { Impl() {
static MultiImplPool pool; static MultiImplPool pool;
multi_impl_ = pool.get(); multi_impl_ = pool.get();
receiver_ = make_unique<TdReceiver>(); td_id_ = multi_impl_->create(receiver_);
td_id_ = multi_impl_->create(*receiver_);
} }
void send(Request request) { void send(Request request) {
@ -506,7 +504,7 @@ class Client::Impl final {
} }
Response receive(double timeout) { Response receive(double timeout) {
auto response = receiver_->receive(timeout); auto response = receiver_.receive(timeout);
Response old_response; Response old_response;
old_response.id = response.request_id; old_response.id = response.request_id;
@ -521,7 +519,7 @@ class Client::Impl final {
~Impl() { ~Impl() {
multi_impl_->close(td_id_); multi_impl_->close(td_id_);
while (true) { while (true) {
auto response = receiver_->receive(10.0); auto response = receiver_.receive(10.0);
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) { if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
break; break;
} }
@ -530,7 +528,7 @@ class Client::Impl final {
private: private:
std::shared_ptr<MultiImpl> multi_impl_; std::shared_ptr<MultiImpl> multi_impl_;
unique_ptr<TdReceiver> receiver_; TdReceiver receiver_;
int32 td_id_; int32 td_id_;
}; };