Simplify Query creation and statistics.

This commit is contained in:
levlam 2021-06-14 22:47:01 +03:00
parent 54112379ff
commit 7de971ec20
6 changed files with 49 additions and 55 deletions

View File

@ -3494,10 +3494,7 @@ void Client::start_up() {
void Client::send(PromisedQueryPtr query) { void Client::send(PromisedQueryPtr query) {
if (!query->is_internal()) { if (!query->is_internal()) {
send_closure( query->set_stat_actor(stat_actor_);
stat_actor_, &BotStatActor::add_event<ServerBotStat::Request>,
ServerBotStat::Request{query->query_size(), query->file_count(), query->files_size(), query->files_max_size()},
td::Time::now());
} }
cmd_queue_.emplace(std::move(query)); cmd_queue_.emplace(std::move(query));
loop(); loop();

View File

@ -115,30 +115,25 @@ void ClientManager::send(PromisedQueryPtr query) {
auto id = clients_.create(ClientInfo{BotStatActor(stat_.actor_id(&stat_)), token, td::ActorOwn<Client>()}); auto id = clients_.create(ClientInfo{BotStatActor(stat_.actor_id(&stat_)), token, td::ActorOwn<Client>()});
auto *client_info = clients_.get(id); auto *client_info = clients_.get(id);
auto stat_actor = client_info->stat_.actor_id(&client_info->stat_); client_info->client_ =
auto client_id = td::create_actor<Client>( td::create_actor<Client>(PSLICE() << "Client/" << token, actor_shared(this, id), query->token().str(),
PSLICE() << "Client/" << token, actor_shared(this, id), query->token().str(), query->is_test_dc(), query->is_test_dc(), get_tqueue_id(r_user_id.ok(), query->is_test_dc()), parameters_,
get_tqueue_id(r_user_id.ok(), query->is_test_dc()), parameters_, std::move(stat_actor)); client_info->stat_.actor_id(&client_info->stat_));
auto method = query->method(); auto method = query->method();
if (method != "deletewebhook" && method != "setwebhook") { if (method != "deletewebhook" && method != "setwebhook") {
auto bot_token_with_dc = PSTRING() << query->token() << (query->is_test_dc() ? ":T" : ""); auto bot_token_with_dc = PSTRING() << query->token() << (query->is_test_dc() ? ":T" : "");
auto webhook_info = parameters_->shared_data_->webhook_db_->get(bot_token_with_dc); auto webhook_info = parameters_->shared_data_->webhook_db_->get(bot_token_with_dc);
if (!webhook_info.empty()) { if (!webhook_info.empty()) {
send_closure(client_id, &Client::send, send_closure(client_info->client_, &Client::send,
get_webhook_restore_query(bot_token_with_dc, webhook_info, parameters_->shared_data_)); get_webhook_restore_query(bot_token_with_dc, webhook_info, parameters_->shared_data_));
} }
} }
clients_.get(id)->client_ = std::move(client_id);
std::tie(id_it, std::ignore) = token_to_id_.emplace(token, id); std::tie(id_it, std::ignore) = token_to_id_.emplace(token, id);
} }
auto *client_info = clients_.get(id_it->second); send_closure(clients_.get(id_it->second)->client_, &Client::send,
std::move(query)); // will send 429 if the client is already closed
if (!query->is_internal()) {
query->set_stat_actor(client_info->stat_.actor_id(&client_info->stat_));
}
send_closure(client_info->client_, &Client::send, std::move(query)); // will send 429 if the client is already closed
} }
void ClientManager::get_stats(td::PromiseActor<td::BufferSlice> promise, void ClientManager::get_stats(td::PromiseActor<td::BufferSlice> promise,
@ -367,8 +362,7 @@ PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, td::S
const auto method = add_string("setwebhook"); const auto method = add_string("setwebhook");
auto query = std::make_unique<Query>(std::move(containers), token, is_test_dc, method, std::move(args), auto query = std::make_unique<Query>(std::move(containers), token, is_test_dc, method, std::move(args),
td::vector<std::pair<td::MutableSlice, td::MutableSlice>>(), td::vector<std::pair<td::MutableSlice, td::MutableSlice>>(),
td::vector<td::HttpFile>(), std::move(shared_data), td::IPAddress()); td::vector<td::HttpFile>(), std::move(shared_data), td::IPAddress(), true);
query->set_internal(true);
return PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor<td::unique_ptr<Query>>())); return PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor<td::unique_ptr<Query>>()));
} }

View File

@ -46,7 +46,7 @@ void HttpConnection::handle(td::unique_ptr<td::HttpQuery> http_query,
auto method = url_path_parser.data(); auto method = url_path_parser.data();
auto query = std::make_unique<Query>(std::move(http_query->container_), token, is_test_dc, method, auto query = std::make_unique<Query>(std::move(http_query->container_), token, is_test_dc, method,
std::move(http_query->args_), std::move(http_query->headers_), std::move(http_query->args_), std::move(http_query->headers_),
std::move(http_query->files_), shared_data_, http_query->peer_address_); std::move(http_query->files_), shared_data_, http_query->peer_address_, false);
td::PromiseActor<td::unique_ptr<Query>> promise; td::PromiseActor<td::unique_ptr<Query>> promise;
td::FutureActor<td::unique_ptr<Query>> future; td::FutureActor<td::unique_ptr<Query>> future;

View File

@ -26,7 +26,7 @@ std::unordered_map<td::string, std::unique_ptr<td::VirtuallyJsonable>> empty_par
Query::Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_test_dc, td::MutableSlice method, Query::Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_test_dc, td::MutableSlice method,
td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&args, td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&args,
td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&headers, td::vector<td::HttpFile> &&files, td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&headers, td::vector<td::HttpFile> &&files,
std::shared_ptr<SharedData> shared_data, const td::IPAddress &peer_address) std::shared_ptr<SharedData> shared_data, const td::IPAddress &peer_address, bool is_internal)
: state_(State::Query) : state_(State::Query)
, shared_data_(shared_data) , shared_data_(shared_data)
, peer_address_(peer_address) , peer_address_(peer_address)
@ -36,7 +36,8 @@ Query::Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_t
, method_(method) , method_(method)
, args_(std::move(args)) , args_(std::move(args))
, headers_(std::move(headers)) , headers_(std::move(headers))
, files_(std::move(files)) { , files_(std::move(files))
, is_internal_(is_internal) {
if (method_.empty()) { if (method_.empty()) {
method_ = arg("method"); method_ = arg("method");
} }
@ -67,6 +68,11 @@ td::int64 Query::files_max_size() const {
[](td::int64 acc, const td::HttpFile &file) { return td::max(acc, file.size); }); [](td::int64 acc, const td::HttpFile &file) { return td::max(acc, file.size); });
} }
void Query::set_stat_actor(td::ActorId<BotStatActor> stat_actor) {
stat_actor_ = stat_actor;
send_request_stat();
}
void Query::set_ok(td::BufferSlice result) { void Query::set_ok(td::BufferSlice result) {
CHECK(state_ == State::Query); CHECK(state_ == State::Query);
LOG(INFO) << "QUERY: got ok " << td::tag("ptr", this) << td::tag("text", result.as_slice()); LOG(INFO) << "QUERY: got ok " << td::tag("ptr", this) << td::tag("text", result.as_slice());
@ -109,12 +115,20 @@ td::StringBuilder &operator<<(td::StringBuilder &sb, const Query &query) {
return sb; return sb;
} }
void Query::send_response_stat() { void Query::send_request_stat() const {
if (stat_actor_.empty()) {
return;
}
send_closure(stat_actor_, &BotStatActor::add_event<ServerBotStat::Request>,
ServerBotStat::Request{query_size(), file_count(), files_size(), files_max_size()}, td::Time::now());
}
void Query::send_response_stat() const {
if (stat_actor_.empty()) { if (stat_actor_.empty()) {
return; return;
} }
send_closure(stat_actor_, &BotStatActor::add_event<ServerBotStat::Response>, send_closure(stat_actor_, &BotStatActor::add_event<ServerBotStat::Response>,
ServerBotStat::Response{is_ok(), answer().size()}, td::Time::now()); ServerBotStat::Response{state_ == State::OK, answer_.size()}, td::Time::now());
} }
} // namespace telegram_bot_api } // namespace telegram_bot_api

View File

@ -73,17 +73,6 @@ class Query : public td::ListNode {
return peer_address_; return peer_address_;
} }
// for stats
td::int32 file_count() const {
return static_cast<td::int32>(files_.size());
}
td::int64 query_size() const;
td::int64 files_size() const;
td::int64 files_max_size() const;
td::BufferSlice &answer() { td::BufferSlice &answer() {
return answer_; return answer_;
} }
@ -106,26 +95,14 @@ class Query : public td::ListNode {
return state_ != State::Query; return state_ != State::Query;
} }
bool is_error() const {
return state_ == State::Error;
}
bool is_ok() const {
return state_ == State::OK;
}
bool is_internal() const { bool is_internal() const {
return is_internal_; return is_internal_;
} }
void set_internal(bool is_internal) {
is_internal_ = is_internal;
}
Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_test_dc, td::MutableSlice method, Query(td::vector<td::BufferSlice> &&container, td::Slice token, bool is_test_dc, td::MutableSlice method,
td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&args, td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&args,
td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&headers, td::vector<td::HttpFile> &&files, td::vector<std::pair<td::MutableSlice, td::MutableSlice>> &&headers, td::vector<td::HttpFile> &&files,
std::shared_ptr<SharedData> shared_data, const td::IPAddress &peer_address); std::shared_ptr<SharedData> shared_data, const td::IPAddress &peer_address, bool is_internal);
Query(const Query &) = delete; Query(const Query &) = delete;
Query &operator=(const Query &) = delete; Query &operator=(const Query &) = delete;
Query(Query &&) = delete; Query(Query &&) = delete;
@ -140,10 +117,7 @@ class Query : public td::ListNode {
return start_timestamp_; return start_timestamp_;
} }
void set_stat_actor(td::ActorId<BotStatActor> stat_actor) { void set_stat_actor(td::ActorId<BotStatActor> stat_actor);
stat_actor_ = stat_actor;
}
void send_response_stat();
private: private:
State state_; State state_;
@ -166,6 +140,21 @@ class Query : public td::ListNode {
td::BufferSlice answer_; td::BufferSlice answer_;
int http_status_code_ = 0; int http_status_code_ = 0;
int retry_after_ = 0; int retry_after_ = 0;
// for stats
td::int32 file_count() const {
return static_cast<td::int32>(files_.size());
}
td::int64 query_size() const;
td::int64 files_size() const;
td::int64 files_max_size() const;
void send_request_stat() const;
void send_response_stat() const;
}; };
td::StringBuilder &operator<<(td::StringBuilder &sb, const Query &query); td::StringBuilder &operator<<(td::StringBuilder &sb, const Query &query);

View File

@ -586,10 +586,10 @@ void WebhookActor::handle(td::unique_ptr<td::HttpQuery> response) {
if (!method.empty() && method != "deletewebhook" && method != "setwebhook" && method != "close" && if (!method.empty() && method != "deletewebhook" && method != "setwebhook" && method != "close" &&
method != "logout" && !td::begins_with(method, "get")) { method != "logout" && !td::begins_with(method, "get")) {
VLOG(webhook) << "Receive request " << method << " in response to webhook"; VLOG(webhook) << "Receive request " << method << " in response to webhook";
auto query = auto query = std::make_unique<Query>(std::move(response->container_), td::MutableSlice(), false,
std::make_unique<Query>(std::move(response->container_), td::MutableSlice(), false, td::MutableSlice(), td::MutableSlice(), std::move(response->args_),
std::move(response->args_), std::move(response->headers_), std::move(response->headers_), std::move(response->files_),
std::move(response->files_), parameters_->shared_data_, response->peer_address_); parameters_->shared_data_, response->peer_address_, false);
auto promised_query = auto promised_query =
PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor<td::unique_ptr<Query>>())); PromisedQueryPtr(query.release(), PromiseDeleter(td::PromiseActor<td::unique_ptr<Query>>()));
send_closure(callback_, &Callback::send, std::move(promised_query)); send_closure(callback_, &Callback::send, std::move(promised_query));