Count number of active clients with a given tqueue_id.

This commit is contained in:
levlam 2021-09-30 23:23:41 +03:00
parent 9380c2a3d1
commit e715de9e1c
3 changed files with 38 additions and 8 deletions

View File

@ -3499,7 +3499,7 @@ void Client::raw_event(const td::Event::Raw &event) {
} }
void Client::loop() { void Client::loop() {
if (logging_out_ || closing_ || was_authorized_) { if (was_authorized_ || logging_out_ || closing_) {
while (!cmd_queue_.empty()) { while (!cmd_queue_.empty()) {
auto query = std::move(cmd_queue_.front()); auto query = std::move(cmd_queue_.front());
cmd_queue_.pop(); cmd_queue_.pop();
@ -4081,6 +4081,7 @@ void Client::on_update_authorization_state() {
if (!was_authorized_) { if (!was_authorized_) {
LOG(WARNING) << "Logged in as @" << user_info->username; LOG(WARNING) << "Logged in as @" << user_info->username;
was_authorized_ = true; was_authorized_ = true;
td::send_event(parent_, td::Event::raw(static_cast<void *>(this)));
update_shared_unix_time_difference(); update_shared_unix_time_difference();
if (!pending_updates_.empty()) { if (!pending_updates_.empty()) {
LOG(INFO) << "Process " << pending_updates_.size() << " pending updates"; LOG(INFO) << "Process " << pending_updates_.size() << " pending updates";
@ -4096,14 +4097,20 @@ void Client::on_update_authorization_state() {
if (!logging_out_) { if (!logging_out_) {
LOG(WARNING) << "Logging out"; LOG(WARNING) << "Logging out";
logging_out_ = true; logging_out_ = true;
if (was_authorized_ && !closing_) {
td::send_event(parent_, td::Event::raw(nullptr));
} }
break; }
return loop();
case td_api::authorizationStateClosing::ID: case td_api::authorizationStateClosing::ID:
if (!closing_) { if (!closing_) {
LOG(WARNING) << "Closing"; LOG(WARNING) << "Closing";
closing_ = true; closing_ = true;
if (was_authorized_ && !logging_out_) {
td::send_event(parent_, td::Event::raw(nullptr));
} }
break; }
return loop();
case td_api::authorizationStateClosed::ID: case td_api::authorizationStateClosed::ID:
return on_closed(); return on_closed();
default: default:

View File

@ -112,12 +112,16 @@ void ClientManager::send(PromisedQueryPtr query) {
} }
flood_control.add_event(static_cast<td::int32>(now)); flood_control.add_event(static_cast<td::int32>(now));
} }
auto tqueue_id = get_tqueue_id(r_user_id.ok(), query->is_test_dc());
if (active_client_count_.find(tqueue_id) != active_client_count_.end()) {
// return query->set_retry_after_error(1);
}
auto id = clients_.create(ClientInfo{BotStatActor(stat_.actor_id(&stat_)), token, td::ActorOwn<Client>()}); auto id =
clients_.create(ClientInfo{BotStatActor(stat_.actor_id(&stat_)), token, tqueue_id, td::ActorOwn<Client>()});
auto *client_info = clients_.get(id); auto *client_info = clients_.get(id);
client_info->client_ = client_info->client_ = td::create_actor<Client>(PSLICE() << "Client/" << token, actor_shared(this, id),
td::create_actor<Client>(PSLICE() << "Client/" << token, actor_shared(this, id), query->token().str(), query->token().str(), query->is_test_dc(), tqueue_id, parameters_,
query->is_test_dc(), get_tqueue_id(r_user_id.ok(), query->is_test_dc()), parameters_,
client_info->stat_.actor_id(&client_info->stat_)); client_info->stat_.actor_id(&client_info->stat_));
auto method = query->method(); auto method = query->method();
@ -382,6 +386,21 @@ PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, td::S
return PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor<td::unique_ptr<Query>>())); return PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor<td::unique_ptr<Query>>()));
} }
void ClientManager::raw_event(const td::Event::Raw &event) {
auto id = get_link_token();
auto *info = clients_.get(id);
CHECK(info != nullptr);
auto &value = active_client_count_[info->tqueue_id_];
if (event.ptr != nullptr) {
value++;
} else {
CHECK(value > 0);
if (--value == 0) {
active_client_count_.erase(info->tqueue_id_);
}
}
}
void ClientManager::hangup_shared() { void ClientManager::hangup_shared() {
auto id = get_link_token(); auto id = get_link_token();
auto *info = clients_.get(id); auto *info = clients_.get(id);
@ -391,6 +410,7 @@ void ClientManager::hangup_shared() {
clients_.erase(id); clients_.erase(id);
if (close_flag_ && clients_.empty()) { if (close_flag_ && clients_.empty()) {
CHECK(active_client_count_.empty());
close_db(); close_db();
} }
} }

View File

@ -52,6 +52,7 @@ class ClientManager final : public td::Actor {
public: public:
BotStatActor stat_; BotStatActor stat_;
td::string token_; td::string token_;
td::int64 tqueue_id_;
td::ActorOwn<Client> client_; td::ActorOwn<Client> client_;
}; };
td::Container<ClientInfo> clients_; td::Container<ClientInfo> clients_;
@ -62,6 +63,7 @@ class ClientManager final : public td::Actor {
std::unordered_map<td::string, td::uint64> token_to_id_; std::unordered_map<td::string, td::uint64> token_to_id_;
std::unordered_map<td::string, td::FloodControlFast> flood_controls_; std::unordered_map<td::string, td::FloodControlFast> flood_controls_;
std::unordered_map<td::int64, td::uint64> active_client_count_;
bool close_flag_ = false; bool close_flag_ = false;
td::vector<td::Promise<td::Unit>> close_promises_; td::vector<td::Promise<td::Unit>> close_promises_;
@ -72,6 +74,7 @@ class ClientManager final : public td::Actor {
std::shared_ptr<SharedData> shared_data); std::shared_ptr<SharedData> shared_data);
void start_up() override; void start_up() override;
void raw_event(const td::Event::Raw &event) override;
void hangup_shared() override; void hangup_shared() override;
void close_db(); void close_db();
void finish_close(); void finish_close();