Improve Session::Query field names.

This commit is contained in:
levlam 2023-08-31 00:47:39 +03:00
parent b90bc7be4b
commit 53b48b4ee8
2 changed files with 61 additions and 57 deletions

View File

@ -509,7 +509,7 @@ void Session::close() {
connection_close(&long_poll_connection_);
for (auto &it : sent_queries_) {
auto &query = it.second.query;
auto &query = it.second.net_query_;
query->set_message_id(0);
query->cancel_slot_.clear_event();
pending_queries_.push(std::move(query));
@ -544,7 +544,7 @@ void Session::raw_event(const Event::Raw &event) {
dec_container(it->first, &it->second);
mark_as_known(it->first, &it->second);
auto query = std::move(it->second.query);
auto query = std::move(it->second.net_query_);
LOG(DEBUG) << "Drop answer for " << query;
query->set_message_id(0);
query->cancel_slot_.clear_event();
@ -583,11 +583,11 @@ Status Session::on_pong() {
auto query = Query::from_list_node(it);
if (Timestamp::at(query->sent_at_ + MAX_QUERY_TIMEOUT).is_in_past()) {
if (status.is_ok()) {
status =
Status::Error(PSLICE() << "No answer from auth key " << auth_data_.get_auth_key().id() << " for "
<< query->query << " for " << format::as_time(Time::now() - query->sent_at_));
status = Status::Error(PSLICE()
<< "No answer from auth key " << auth_data_.get_auth_key().id() << " for "
<< query->net_query_ << " for " << format::as_time(Time::now() - query->sent_at_));
}
query->ack = false;
query->is_acknowledged_ = false;
} else {
break;
}
@ -670,7 +670,7 @@ void Session::on_closed(Status status) {
// resend all queries without ack
for (auto it = sent_queries_.begin(); it != sent_queries_.end();) {
if (!it->second.ack && it->second.connection_id == current_info_->connection_id_) {
if (!it->second.is_acknowledged_ && it->second.connection_id_ == current_info_->connection_id_) {
// container vector leak otherwise
cleanup_container(it->first, &it->second);
@ -679,7 +679,7 @@ void Session::on_closed(Status status) {
cleanup_container(it->first, &it->second);
mark_as_known(it->first, &it->second);
auto &query = it->second.query;
auto &query = it->second.net_query_;
VLOG(net_query) << "Resend query (on_disconnected, no ack) " << query;
query->set_message_id(0);
query->cancel_slot_.clear_event();
@ -716,12 +716,12 @@ void Session::on_session_created(uint64 unique_id, uint64 first_message_id) {
for (auto it = sent_queries_.begin(); it != sent_queries_.end();) {
Query *query_ptr = &it->second;
if (query_ptr->container_message_id < first_message_id) {
if (query_ptr->container_message_id_ < first_message_id) {
// container vector leak otherwise
cleanup_container(it->first, &it->second);
mark_as_known(it->first, &it->second);
auto &query = it->second.query;
auto &query = it->second.net_query_;
VLOG(net_query) << "Resend query (on_session_created) " << query;
query->set_message_id(0);
query->cancel_slot_.clear_event();
@ -752,7 +752,7 @@ void Session::on_container_sent(uint64 container_message_id, vector<uint64> mess
if (it == sent_queries_.end()) {
return true; // remove
}
it->second.container_message_id = container_message_id;
it->second.container_message_id_ = container_message_id;
return false;
});
if (message_ids.empty()) {
@ -786,13 +786,13 @@ void Session::on_message_ack_impl_inner(uint64 message_id, int32 type, bool in_c
if (it == sent_queries_.end()) {
return;
}
VLOG(net_query) << "Ack " << it->second.query;
it->second.ack = true;
VLOG(net_query) << "Ack " << it->second.net_query_;
it->second.is_acknowledged_ = true;
{
auto lock = it->second.query->lock();
it->second.query->get_data_unsafe().ack_state_ |= type;
auto lock = it->second.net_query_->lock();
it->second.net_query_->get_data_unsafe().ack_state_ |= type;
}
it->second.query->quick_ack_promise_.set_value(Unit());
it->second.net_query_->quick_ack_promise_.set_value(Unit());
if (!in_container) {
cleanup_container(message_id, &it->second);
}
@ -800,11 +800,11 @@ void Session::on_message_ack_impl_inner(uint64 message_id, int32 type, bool in_c
}
void Session::dec_container(uint64 container_message_id, Query *query) {
if (query->container_message_id == container_message_id) {
if (query->container_message_id_ == container_message_id) {
// message was sent without any container
return;
}
auto it = sent_containers_.find(query->container_message_id);
auto it = sent_containers_.find(query->container_message_id_);
if (it == sent_containers_.end()) {
return;
}
@ -816,26 +816,26 @@ void Session::dec_container(uint64 container_message_id, Query *query) {
}
void Session::cleanup_container(uint64 container_message_id, Query *query) {
if (query->container_message_id == container_message_id) {
if (query->container_message_id_ == container_message_id) {
// message was sent without any container
return;
}
// we can forget container now, since we have an answer for its part.
// TODO: we can do it only for one element per container
sent_containers_.erase(query->container_message_id);
sent_containers_.erase(query->container_message_id_);
}
void Session::mark_as_known(uint64 message_id, Query *query) {
{
auto lock = query->query->lock();
query->query->get_data_unsafe().unknown_state_ = false;
auto lock = query->net_query_->lock();
query->net_query_->get_data_unsafe().unknown_state_ = false;
}
if (!query->unknown) {
if (!query->is_unknown_) {
return;
}
VLOG(net_query) << "Mark as known " << query->query;
query->unknown = false;
VLOG(net_query) << "Mark as known " << query->net_query_;
query->is_unknown_ = false;
unknown_queries_.erase(message_id);
if (unknown_queries_.empty()) {
flush_pending_invoke_after_queries();
@ -844,14 +844,14 @@ void Session::mark_as_known(uint64 message_id, Query *query) {
void Session::mark_as_unknown(uint64 message_id, Query *query) {
{
auto lock = query->query->lock();
query->query->get_data_unsafe().unknown_state_ = true;
auto lock = query->net_query_->lock();
query->net_query_->get_data_unsafe().unknown_state_ = true;
}
if (query->unknown) {
if (query->is_unknown_) {
return;
}
VLOG(net_query) << "Mark as unknown " << query->query;
query->unknown = true;
VLOG(net_query) << "Mark as unknown " << query->net_query_;
query->is_unknown_ = true;
CHECK(message_id != 0);
unknown_queries_.insert(message_id);
}
@ -894,7 +894,7 @@ Status Session::on_message_result_ok(uint64 message_id, BufferSlice packet, size
auth_data_.on_api_response();
Query *query_ptr = &it->second;
VLOG(net_query) << "Return query result " << query_ptr->query;
VLOG(net_query) << "Return query result " << query_ptr->net_query_;
if (!parser.get_error()) {
// Steal authorization information.
@ -902,7 +902,7 @@ Status Session::on_message_result_ok(uint64 message_id, BufferSlice packet, size
if (response_id == telegram_api::auth_authorization::ID ||
response_id == telegram_api::auth_loginTokenSuccess::ID ||
response_id == telegram_api::auth_sentCodeSuccess::ID) {
if (query_ptr->query->tl_constructor() != telegram_api::auth_importAuthorization::ID) {
if (query_ptr->net_query_->tl_constructor() != telegram_api::auth_importAuthorization::ID) {
G()->net_query_dispatcher().set_main_dc_id(raw_dc_id_);
}
auth_data_.set_auth_flag(true);
@ -912,11 +912,11 @@ Status Session::on_message_result_ok(uint64 message_id, BufferSlice packet, size
cleanup_container(message_id, query_ptr);
mark_as_known(message_id, query_ptr);
query_ptr->query->on_net_read(original_size);
query_ptr->query->set_ok(std::move(packet));
query_ptr->query->set_message_id(0);
query_ptr->query->cancel_slot_.clear_event();
return_query(std::move(query_ptr->query));
query_ptr->net_query_->on_net_read(original_size);
query_ptr->net_query_->set_ok(std::move(packet));
query_ptr->net_query_->set_message_id(0);
query_ptr->net_query_->cancel_slot_.clear_event();
return_query(std::move(query_ptr->net_query_));
sent_queries_.erase(it);
return Status::OK();
@ -996,14 +996,14 @@ void Session::on_message_result_error(uint64 message_id, int error_code, string
}
Query *query_ptr = &it->second;
VLOG(net_query) << "Return query error " << query_ptr->query;
VLOG(net_query) << "Return query error " << query_ptr->net_query_;
cleanup_container(message_id, query_ptr);
mark_as_known(message_id, query_ptr);
query_ptr->query->set_error(Status::Error(error_code, message), current_info_->connection_->get_name().str());
query_ptr->query->set_message_id(0);
query_ptr->query->cancel_slot_.clear_event();
return_query(std::move(query_ptr->query));
query_ptr->net_query_->set_error(Status::Error(error_code, message), current_info_->connection_->get_name().str());
query_ptr->net_query_->set_message_id(0);
query_ptr->net_query_->cancel_slot_.clear_event();
return_query(std::move(query_ptr->net_query_));
sent_queries_.erase(it);
}
@ -1021,10 +1021,10 @@ void Session::on_message_failed_inner(uint64 message_id, bool in_container) {
}
mark_as_known(message_id, query_ptr);
query_ptr->query->set_message_id(0);
query_ptr->query->cancel_slot_.clear_event();
query_ptr->query->debug_send_failed();
resend_query(std::move(query_ptr->query));
query_ptr->net_query_->set_message_id(0);
query_ptr->net_query_->cancel_slot_.clear_event();
query_ptr->net_query_->debug_send_failed();
resend_query(std::move(query_ptr->net_query_));
sent_queries_.erase(it);
}
@ -1049,11 +1049,11 @@ void Session::on_message_failed(uint64 message_id, Status status) {
void Session::on_message_info(uint64 message_id, int32 state, uint64 answer_message_id, int32 answer_size) {
auto it = sent_queries_.find(message_id);
if (it != sent_queries_.end()) {
if (it->second.query->update_is_ready()) {
if (it->second.net_query_->update_is_ready()) {
dec_container(it->first, &it->second);
mark_as_known(it->first, &it->second);
auto query = std::move(it->second.query);
auto query = std::move(it->second.net_query_);
query->set_message_id(0);
query->cancel_slot_.clear_event();
sent_queries_.erase(it);
@ -1090,8 +1090,8 @@ void Session::on_message_info(uint64 message_id, int32 state, uint64 answer_mess
if (answer_message_id != 0) {
if (it != sent_queries_.end()) {
VLOG_IF(net_query, message_id != 0) << "Resend answer " << tag("answer_message_id", answer_message_id)
<< tag("answer_size", answer_size) << it->second.query;
it->second.query->debug(PSTRING() << get_name() << ": resend answer");
<< tag("answer_size", answer_size) << it->second.net_query_;
it->second.net_query_->debug(PSTRING() << get_name() << ": resend answer");
}
current_info_->connection_->resend_answer(answer_message_id);
}

View File

@ -78,16 +78,20 @@ class Session final
private:
struct Query final : private ListNode {
uint64 container_message_id;
NetQueryPtr query;
uint64 container_message_id_;
NetQueryPtr net_query_;
bool ack = false;
bool unknown = false;
bool is_acknowledged_ = false;
bool is_unknown_ = false;
int8 connection_id;
const int8 connection_id_;
double sent_at_;
Query(uint64 message_id, NetQueryPtr &&q, int8 connection_id, double sent_at)
: container_message_id(message_id), query(std::move(q)), connection_id(connection_id), sent_at_(sent_at) {
Query(uint64 message_id, NetQueryPtr &&net_query, int8 connection_id, double sent_at)
: container_message_id_(message_id)
, net_query_(std::move(net_query))
, connection_id_(connection_id)
, sent_at_(sent_at) {
}
ListNode *get_list_node() {