Clear ClientManager when the last Client is closed.

GitOrigin-RevId: d2d5194f7ffc59dc8db1196401c80689a8645dac
This commit is contained in:
levlam 2020-10-09 15:39:30 +03:00
parent b491964a81
commit 38f72b353a
3 changed files with 91 additions and 48 deletions

View File

@ -76,14 +76,15 @@ class TdReceiver {
class ClientManager::Impl final {
public:
Impl() {
options_.net_query_stats = std::make_shared<NetQueryStats>();
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0);
concurrent_scheduler_->start();
}
ClientId create_client() {
if (tds_.empty()) {
CHECK(concurrent_scheduler_ == nullptr);
CHECK(options_.net_query_stats == nullptr);
options_.net_query_stats = std::make_shared<NetQueryStats>();
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0);
concurrent_scheduler_->start();
}
auto client_id = ++client_id_;
tds_[client_id] =
concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_.create_callback(client_id), options_);
@ -96,7 +97,6 @@ class ClientManager::Impl final {
Response receive(double timeout) {
if (!requests_.empty()) {
auto guard = concurrent_scheduler_->get_main_guard();
for (size_t i = 0; i < requests_.size(); i++) {
auto &request = requests_[i];
if (request.client_id <= 0 || request.client_id > client_id_) {
@ -110,13 +110,16 @@ class ClientManager::Impl final {
td_api::make_object<td_api::error>(500, "Request aborted"));
continue;
}
CHECK(concurrent_scheduler_ != nullptr);
auto guard = concurrent_scheduler_->get_main_guard();
send_closure_later(it->second, &Td::request, request.id, std::move(request.request));
}
requests_.clear();
}
auto response = receiver_.receive(0);
if (response.client_id == 0) {
if (response.client_id == 0 && concurrent_scheduler_ != nullptr) {
concurrent_scheduler_->run_main(0);
response = receiver_.receive(0);
} else {
@ -132,12 +135,22 @@ class ClientManager::Impl final {
it->second.reset();
}
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
CHECK(concurrent_scheduler_ != nullptr);
auto guard = concurrent_scheduler_->get_main_guard();
auto it = tds_.find(response.client_id);
CHECK(it != tds_.end());
CHECK(it->second.empty());
tds_.erase(it);
response.client_id = 0;
if (tds_.empty()) {
CHECK(options_.net_query_stats.use_count() == 1);
CHECK(options_.net_query_stats->get_count() == 0);
options_.net_query_stats = nullptr;
concurrent_scheduler_->finish();
concurrent_scheduler_ = nullptr;
reset_to_empty(tds_);
}
}
return response;
}
@ -147,6 +160,10 @@ class ClientManager::Impl final {
Impl(Impl &&) = delete;
Impl &operator=(Impl &&) = delete;
~Impl() {
if (concurrent_scheduler_ == nullptr) {
return;
}
{
auto guard = concurrent_scheduler_->get_main_guard();
for (auto &td : tds_) {
@ -388,6 +405,8 @@ class MultiImplPool {
init_openssl_threads();
impls_.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4);
net_query_stats_ = std::make_shared<NetQueryStats>();
}
auto &impl = *std::min_element(impls_.begin(), impls_.end(),
[](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); });
@ -399,10 +418,28 @@ class MultiImplPool {
return result;
}
void try_clear() {
std::unique_lock<std::mutex> lock(mutex_);
if (impls_.empty()) {
return;
}
for (auto &impl : impls_) {
if (impl.lock().use_count() != 0) {
return;
}
}
reset_to_empty(impls_);
CHECK(net_query_stats_.use_count() == 1);
CHECK(net_query_stats_->get_count() == 0);
net_query_stats_ = nullptr;
}
private:
std::mutex mutex_;
std::vector<std::weak_ptr<MultiImpl>> impls_;
std::shared_ptr<NetQueryStats> net_query_stats_ = std::make_shared<NetQueryStats>();
std::shared_ptr<NetQueryStats> net_query_stats_;
};
class ClientManager::Impl final {
@ -448,6 +485,11 @@ class ClientManager::Impl final {
CHECK(it->second.is_closed);
impls_.erase(it);
response.client_id = 0;
if (impls_.empty()) {
reset_to_empty(impls_);
pool_.try_clear();
}
}
return response;
}

View File

@ -260,13 +260,13 @@ class Td final : public NetQueryCallback {
void dec_stop_cnt();
unique_ptr<TdCallback> callback_;
Options td_options_;
MtprotoHeader::Options options_;
TdParameters parameters_;
unique_ptr<TdCallback> callback_;
Options td_options_;
StateManager::State connection_state_;
std::unordered_multiset<uint64> request_set_;

View File

@ -1092,7 +1092,6 @@ TEST(Client, ManagerClose) {
#endif
TEST(Client, ManagerCloseOneThread) {
SET_VERBOSITY_LEVEL(2);
td::ClientManager client_manager;
td::uint64 request_id = 2;
@ -1133,42 +1132,44 @@ TEST(Client, ManagerCloseOneThread) {
}
};
for (td::int32 i = -5; i <= 0; i++) {
send_request(i, 400);
for (int t = 0; t < 3; t++) {
for (td::int32 i = -5; i <= 0; i++) {
send_request(i, 400);
}
receive();
auto client_id = client_manager.create_client();
for (td::int32 i = -5; i < 5; i++) {
send_request(i, i == client_id ? 0 : (i > 0 && i < client_id ? 500 : 400));
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 0);
}
receive();
sent_count++;
sent_requests.emplace(1, 0);
client_manager.send(client_id, 1, td::make_tl_object<td::td_api::close>());
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
}
receive();
auto client_id = client_manager.create_client();
for (td::int32 i = -5; i < 5; i++) {
send_request(i, i == client_id ? 0 : (i > 0 && i < client_id ? 500 : 400));
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 0);
}
receive();
sent_count++;
sent_requests.emplace(1, 0);
client_manager.send(client_id, 1, td::make_tl_object<td::td_api::close>());
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
ASSERT_TRUE(sent_requests.empty());
}