Fix ClientManager closing.

GitOrigin-RevId: eb588d9991ea6c8c2b4a339d1396d58179c84f43
This commit is contained in:
levlam 2020-10-08 01:28:24 +03:00
parent f61bd5b89a
commit 5ac5acd1cb

View File

@ -92,20 +92,26 @@ class ClientManager::Impl final {
}
void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) {
Request request;
request.client_id = client_id;
request.id = request_id;
request.request = std::move(request);
requests_.push_back(std::move(request));
requests_.push_back({client_id, request_id, std::move(request)});
}
Response receive(double timeout) {
if (!requests_.empty()) {
auto guard = concurrent_scheduler_->get_main_guard();
for (auto &request : requests_) {
auto &td = tds_[request.client_id];
CHECK(!td.empty());
send_closure_later(td, &Td::request, request.id, std::move(request.request));
for (size_t i = 0; i < requests_.size(); i++) {
auto &request = requests_[i];
auto it = tds_.find(request.client_id);
if (it == tds_.end()) {
receiver_->add_response(request.client_id, request.id,
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
continue;
}
if (it->second.empty()) {
receiver_->add_response(request.client_id, request.id,
td_api::make_object<td_api::error>(500, "Request aborted"));
continue;
}
send_closure_later(it->second, &Td::request, request.id, std::move(request.request));
}
requests_.clear();
}
@ -117,9 +123,21 @@ class ClientManager::Impl final {
} else {
ConcurrentScheduler::emscripten_clear_main_timeout();
}
if (response.request_id == 0 && response.object != nullptr &&
response.object->get_id() == td::td_api::updateAuthorizationState::ID &&
static_cast<const td::td_api::updateAuthorizationState *>(response.object.get())
->authorization_state_->get_id() == td::td_api::authorizationStateClosed::ID) {
auto it = tds_.find(response.client_id);
CHECK(it != tds_.end());
it->second.reset();
}
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
auto guard = concurrent_scheduler_->get_main_guard();
tds_.erase(response.client_id);
auto it = tds_.find(response.client_id);
CHECK(it != tds_.end());
CHECK(it->second.empty());
tds_.erase(it);
response.client_id = 0;
}
return response;
}
@ -132,7 +150,7 @@ class ClientManager::Impl final {
{
auto guard = concurrent_scheduler_->get_main_guard();
for (auto &td : tds_) {
td.second = {};
td.second.reset();
}
}
while (!tds_.empty()) {
@ -148,7 +166,7 @@ class ClientManager::Impl final {
RequestId id;
td_api::object_ptr<td_api::Function> request;
};
td::vector<Request> requests_;
vector<Request> requests_;
unique_ptr<ConcurrentScheduler> concurrent_scheduler_;
ClientId client_id_{0};
Td::Options options_;
@ -157,18 +175,18 @@ class ClientManager::Impl final {
class Client::Impl final {
public:
Impl() {
client_id_ = impl_.create_client();
Impl() : client_id_(impl_.create_client()) {
}
void send(Request request) {
impl_.send(client_id_, request.id, std::move(request.request));
impl_.send(client_id_, request.id, std::move(request.function));
}
Response receive(double timeout) {
auto response = impl_.receive(timeout);
Response old_response;
old_response.id = response.id;
old_response.id = response.request_id;
old_response.object = std::move(response.object);
return old_response;
}
@ -205,8 +223,8 @@ class MultiTd : public Actor {
}
void close(int32 td_id) {
// no check that td_id hasn't been deleted before
tds_.erase(td_id);
size_t erased = tds_.erase(td_id);
CHECK(erased > 0);
}
private:
@ -364,12 +382,12 @@ class MultiImplPool {
}
auto &impl = *std::min_element(impls_.begin(), impls_.end(),
[](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); });
auto res = impl.lock();
if (!res) {
res = std::make_shared<MultiImpl>(net_query_stats_);
impl = res;
auto result = impl.lock();
if (!result) {
result = std::make_shared<MultiImpl>(net_query_stats_);
impl = result;
}
return res;
return result;
}
private:
@ -385,7 +403,7 @@ class ClientManager::Impl final {
auto client_id = impl->create(*receiver_);
{
auto lock = impls_mutex_.lock_write().move_as_ok();
impls_[client_id] = std::move(impl);
impls_[client_id].impl = std::move(impl);
}
return client_id;
}
@ -398,18 +416,42 @@ class ClientManager::Impl final {
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
return;
}
it->second->send(client_id, request_id, std::move(request));
if (it->second.is_closed) {
receiver_->add_response(client_id, request_id, td_api::make_object<td_api::error>(500, "Request aborted"));
return;
}
it->second.impl->send(client_id, request_id, std::move(request));
}
Response receive(double timeout) {
auto response = receiver_->receive(timeout);
if (response.request_id == 0 && response.object != nullptr &&
response.object->get_id() == td::td_api::updateAuthorizationState::ID &&
static_cast<const td::td_api::updateAuthorizationState *>(response.object.get())
->authorization_state_->get_id() == td::td_api::authorizationStateClosed::ID) {
auto lock = impls_mutex_.lock_write().move_as_ok();
close_impl(response.client_id);
}
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
auto lock = impls_mutex_.lock_write().move_as_ok();
impls_.erase(response.client_id);
auto it = impls_.find(response.client_id);
CHECK(it != impls_.end());
CHECK(it->second.is_closed);
impls_.erase(it);
response.client_id = 0;
}
return response;
}
void close_impl(ClientId client_id) {
auto it = impls_.find(client_id);
CHECK(it != impls_.end());
if (!it->second.is_closed) {
it->second.is_closed = true;
it->second.impl->close(client_id);
}
}
Impl() = default;
Impl(const Impl &) = delete;
Impl &operator=(const Impl &) = delete;
@ -417,7 +459,7 @@ class ClientManager::Impl final {
Impl &operator=(Impl &&) = delete;
~Impl() {
for (auto &it : impls_) {
it.second->close(it.first);
close_impl(it.first);
}
while (!impls_.empty()) {
receive(10);
@ -427,7 +469,11 @@ class ClientManager::Impl final {
private:
MultiImplPool pool_;
RwMutex impls_mutex_;
std::unordered_map<ClientId, std::shared_ptr<MultiImpl>> impls_;
struct MultiImplInfo {
std::shared_ptr<MultiImpl> impl;
bool is_closed = false;
};
std::unordered_map<ClientId, MultiImplInfo> impls_;
unique_ptr<TdReceiver> receiver_{make_unique<TdReceiver>()};
};
@ -440,7 +486,7 @@ class Client::Impl final {
td_id_ = multi_impl_->create(*receiver_);
}
void send(Client::Request request) {
void send(Request request) {
if (request.id == 0 || request.function == nullptr) {
LOG(ERROR) << "Drop wrong request " << request.id;
return;
@ -449,13 +495,13 @@ class Client::Impl final {
multi_impl_->send(td_id_, request.id, std::move(request.function));
}
Client::Response receive(double timeout) {
auto res = receiver_->receive(timeout);
Response receive(double timeout) {
auto response = receiver_->receive(timeout);
Client::Response old_res;
old_res.id = res.request_id;
old_res.object = std::move(res.object);
return old_res;
Response old_response;
old_response.id = response.request_id;
old_response.object = std::move(response.object);
return old_response;
}
Impl(const Impl &) = delete;