Improve message_id variable names in Session.

This commit is contained in:
levlam 2023-01-15 23:27:58 +03:00
parent 8d8f72a17a
commit c508e54a49
2 changed files with 95 additions and 93 deletions

View File

@ -693,9 +693,9 @@ void Session::on_closed(Status status) {
current_info_->state_ = ConnectionInfo::State::Empty; current_info_->state_ = ConnectionInfo::State::Empty;
} }
void Session::on_session_created(uint64 unique_id, uint64 first_id) { void Session::on_session_created(uint64 unique_id, uint64 first_message_id) {
// TODO: use unique_id // TODO: use unique_id
LOG(INFO) << "New session " << unique_id << " created with first message_id " << first_id; LOG(INFO) << "New session " << unique_id << " created with first message_id " << first_message_id;
if (!use_pfs_ && !auth_data_.use_pfs()) { if (!use_pfs_ && !auth_data_.use_pfs()) {
last_success_timestamp_ = Time::now(); last_success_timestamp_ = Time::now();
} }
@ -709,7 +709,7 @@ void Session::on_session_created(uint64 unique_id, uint64 first_id) {
for (auto it = sent_queries_.begin(); it != sent_queries_.end();) { for (auto it = sent_queries_.begin(); it != sent_queries_.end();) {
Query *query_ptr = &it->second; Query *query_ptr = &it->second;
if (query_ptr->container_id < first_id) { if (query_ptr->container_message_id < first_message_id) {
// container vector leak otherwise // container vector leak otherwise
cleanup_container(it->first, &it->second); cleanup_container(it->first, &it->second);
mark_as_known(it->first, &it->second); mark_as_known(it->first, &it->second);
@ -737,30 +737,30 @@ void Session::on_session_failed(Status status) {
callback_->on_failed(); callback_->on_failed();
} }
void Session::on_container_sent(uint64 container_id, vector<uint64> msg_ids) { void Session::on_container_sent(uint64 container_message_id, vector<uint64> message_ids) {
CHECK(container_id != 0); CHECK(container_message_id != 0);
td::remove_if(msg_ids, [&](uint64 msg_id) { td::remove_if(message_ids, [&](uint64 message_id) {
auto it = sent_queries_.find(msg_id); auto it = sent_queries_.find(message_id);
if (it == sent_queries_.end()) { if (it == sent_queries_.end()) {
return true; // remove return true; // remove
} }
it->second.container_id = container_id; it->second.container_message_id = container_message_id;
return false; return false;
}); });
if (msg_ids.empty()) { if (message_ids.empty()) {
return; return;
} }
auto size = msg_ids.size(); auto size = message_ids.size();
sent_containers_.emplace(container_id, ContainerInfo{size, std::move(msg_ids)}); sent_containers_.emplace(container_message_id, ContainerInfo{size, std::move(message_ids)});
} }
void Session::on_message_ack(uint64 id) { void Session::on_message_ack(uint64 message_id) {
on_message_ack_impl(id, 1); on_message_ack_impl(message_id, 1);
} }
void Session::on_message_ack_impl(uint64 id, int32 type) { void Session::on_message_ack_impl(uint64 container_message_id, int32 type) {
auto cit = sent_containers_.find(id); auto cit = sent_containers_.find(container_message_id);
if (cit != sent_containers_.end()) { if (cit != sent_containers_.end()) {
auto container_info = std::move(cit->second); auto container_info = std::move(cit->second);
sent_containers_.erase(cit); sent_containers_.erase(cit);
@ -771,15 +771,15 @@ void Session::on_message_ack_impl(uint64 id, int32 type) {
return; return;
} }
on_message_ack_impl_inner(id, type, false); on_message_ack_impl_inner(container_message_id, type, false);
} }
void Session::on_message_ack_impl_inner(uint64 id, int32 type, bool in_container) { void Session::on_message_ack_impl_inner(uint64 message_id, int32 type, bool in_container) {
auto it = sent_queries_.find(id); auto it = sent_queries_.find(message_id);
if (it == sent_queries_.end()) { if (it == sent_queries_.end()) {
return; return;
} }
VLOG(net_query) << "Ack " << tag("msg_id", id) << it->second.query; VLOG(net_query) << "Ack " << tag("message_id", message_id) << it->second.query;
it->second.ack = true; it->second.ack = true;
{ {
auto lock = it->second.query->lock(); auto lock = it->second.query->lock();
@ -787,17 +787,17 @@ void Session::on_message_ack_impl_inner(uint64 id, int32 type, bool in_container
} }
it->second.query->quick_ack_promise_.set_value(Unit()); it->second.query->quick_ack_promise_.set_value(Unit());
if (!in_container) { if (!in_container) {
cleanup_container(id, &it->second); cleanup_container(message_id, &it->second);
} }
mark_as_known(it->first, &it->second); mark_as_known(it->first, &it->second);
} }
void Session::dec_container(uint64 message_id, Query *query) { void Session::dec_container(uint64 container_message_id, Query *query) {
if (query->container_id == message_id) { if (query->container_message_id == container_message_id) {
// message was sent without any container // message was sent without any container
return; return;
} }
auto it = sent_containers_.find(query->container_id); auto it = sent_containers_.find(query->container_message_id);
if (it == sent_containers_.end()) { if (it == sent_containers_.end()) {
return; return;
} }
@ -808,18 +808,18 @@ void Session::dec_container(uint64 message_id, Query *query) {
} }
} }
void Session::cleanup_container(uint64 message_id, Query *query) { void Session::cleanup_container(uint64 container_message_id, Query *query) {
if (query->container_id == message_id) { if (query->container_message_id == container_message_id) {
// message was sent without any container // message was sent without any container
return; return;
} }
// we can forget container now, since we have an answer for its part. // we can forget container now, since we have an answer for its part.
// TODO: we can do it only for one element per container // TODO: we can do it only for one element per container
sent_containers_.erase(query->container_id); sent_containers_.erase(query->container_message_id);
} }
void Session::mark_as_known(uint64 id, Query *query) { void Session::mark_as_known(uint64 message_id, Query *query) {
{ {
auto lock = query->query->lock(); auto lock = query->query->lock();
query->query->get_data_unsafe().unknown_state_ = false; query->query->get_data_unsafe().unknown_state_ = false;
@ -827,15 +827,15 @@ void Session::mark_as_known(uint64 id, Query *query) {
if (!query->unknown) { if (!query->unknown) {
return; return;
} }
VLOG(net_query) << "Mark as known " << tag("msg_id", id) << query->query; VLOG(net_query) << "Mark as known " << tag("message_id", message_id) << query->query;
query->unknown = false; query->unknown = false;
unknown_queries_.erase(id); unknown_queries_.erase(message_id);
if (unknown_queries_.empty()) { if (unknown_queries_.empty()) {
flush_pending_invoke_after_queries(); flush_pending_invoke_after_queries();
} }
} }
void Session::mark_as_unknown(uint64 id, Query *query) { void Session::mark_as_unknown(uint64 message_id, Query *query) {
{ {
auto lock = query->query->lock(); auto lock = query->query->lock();
query->query->get_data_unsafe().unknown_state_ = true; query->query->get_data_unsafe().unknown_state_ = true;
@ -843,10 +843,10 @@ void Session::mark_as_unknown(uint64 id, Query *query) {
if (query->unknown) { if (query->unknown) {
return; return;
} }
VLOG(net_query) << "Mark as unknown " << tag("msg_id", id) << query->query; VLOG(net_query) << "Mark as unknown " << tag("message_id", message_id) << query->query;
query->unknown = true; query->unknown = true;
CHECK(id != 0); CHECK(message_id != 0);
unknown_queries_.insert(id); unknown_queries_.insert(message_id);
} }
Status Session::on_update(BufferSlice packet) { Status Session::on_update(BufferSlice packet) {
@ -862,16 +862,16 @@ Status Session::on_update(BufferSlice packet) {
return Status::OK(); return Status::OK();
} }
Status Session::on_message_result_ok(uint64 id, BufferSlice packet, size_t original_size) { Status Session::on_message_result_ok(uint64 message_id, BufferSlice packet, size_t original_size) {
last_success_timestamp_ = Time::now(); last_success_timestamp_ = Time::now();
TlParser parser(packet.as_slice()); TlParser parser(packet.as_slice());
int32 ID = parser.fetch_int(); int32 ID = parser.fetch_int();
auto it = sent_queries_.find(id); auto it = sent_queries_.find(message_id);
if (it == sent_queries_.end()) { if (it == sent_queries_.end()) {
LOG(DEBUG) << "Drop result to " << tag("request_id", format::as_hex(id)) << tag("original_size", original_size) LOG(DEBUG) << "Drop result to " << tag("message_id", format::as_hex(message_id))
<< tag("tl", format::as_hex(ID)); << tag("original_size", original_size) << tag("tl", format::as_hex(ID));
if (original_size > 16 * 1024) { if (original_size > 16 * 1024) {
dropped_size_ += original_size; dropped_size_ += original_size;
@ -901,8 +901,8 @@ Status Session::on_message_result_ok(uint64 id, BufferSlice packet, size_t origi
} }
} }
cleanup_container(id, query_ptr); cleanup_container(message_id, query_ptr);
mark_as_known(id, query_ptr); mark_as_known(message_id, query_ptr);
query_ptr->query->on_net_read(original_size); query_ptr->query->on_net_read(original_size);
query_ptr->query->set_ok(std::move(packet)); query_ptr->query->set_ok(std::move(packet));
query_ptr->query->set_message_id(0); query_ptr->query->set_message_id(0);
@ -913,7 +913,7 @@ Status Session::on_message_result_ok(uint64 id, BufferSlice packet, size_t origi
return Status::OK(); return Status::OK();
} }
void Session::on_message_result_error(uint64 id, int error_code, string message) { void Session::on_message_result_error(uint64 message_id, int error_code, string message) {
if (!check_utf8(message)) { if (!check_utf8(message)) {
LOG(ERROR) << "Receive invalid error message \"" << message << '"'; LOG(ERROR) << "Receive invalid error message \"" << message << '"';
message = "INVALID_UTF8_ERROR_MESSAGE"; message = "INVALID_UTF8_ERROR_MESSAGE";
@ -970,7 +970,7 @@ void Session::on_message_result_error(uint64 id, int error_code, string message)
error_code = 500; error_code = 500;
} }
if (id == 0) { if (message_id == 0) {
LOG(ERROR) << "Received an error update"; LOG(ERROR) << "Received an error update";
return; return;
} }
@ -982,7 +982,7 @@ void Session::on_message_result_error(uint64 id, int error_code, string message)
} else { } else {
LOG(DEBUG) << "Receive error " << error_code << " : " << message; LOG(DEBUG) << "Receive error " << error_code << " : " << message;
} }
auto it = sent_queries_.find(id); auto it = sent_queries_.find(message_id);
if (it == sent_queries_.end()) { if (it == sent_queries_.end()) {
current_info_->connection_->force_ack(); current_info_->connection_->force_ack();
return; return;
@ -991,8 +991,8 @@ void Session::on_message_result_error(uint64 id, int error_code, string message)
Query *query_ptr = &it->second; Query *query_ptr = &it->second;
VLOG(net_query) << "Return query error " << query_ptr->query; VLOG(net_query) << "Return query error " << query_ptr->query;
cleanup_container(id, query_ptr); cleanup_container(message_id, query_ptr);
mark_as_known(id, query_ptr); mark_as_known(message_id, query_ptr);
query_ptr->query->set_error(Status::Error(error_code, message), current_info_->connection_->get_name().str()); query_ptr->query->set_error(Status::Error(error_code, message), current_info_->connection_->get_name().str());
query_ptr->query->set_message_id(0); query_ptr->query->set_message_id(0);
query_ptr->query->cancel_slot_.clear_event(); query_ptr->query->cancel_slot_.clear_event();
@ -1001,18 +1001,18 @@ void Session::on_message_result_error(uint64 id, int error_code, string message)
sent_queries_.erase(it); sent_queries_.erase(it);
} }
void Session::on_message_failed_inner(uint64 id, bool in_container) { void Session::on_message_failed_inner(uint64 message_id, bool in_container) {
LOG(INFO) << "Message inner failed " << id; LOG(INFO) << "Message inner failed " << message_id;
auto it = sent_queries_.find(id); auto it = sent_queries_.find(message_id);
if (it == sent_queries_.end()) { if (it == sent_queries_.end()) {
return; return;
} }
Query *query_ptr = &it->second; Query *query_ptr = &it->second;
if (!in_container) { if (!in_container) {
cleanup_container(id, query_ptr); cleanup_container(message_id, query_ptr);
} }
mark_as_known(id, query_ptr); mark_as_known(message_id, query_ptr);
query_ptr->query->set_message_id(0); query_ptr->query->set_message_id(0);
query_ptr->query->cancel_slot_.clear_event(); query_ptr->query->cancel_slot_.clear_event();
@ -1021,26 +1021,26 @@ void Session::on_message_failed_inner(uint64 id, bool in_container) {
sent_queries_.erase(it); sent_queries_.erase(it);
} }
void Session::on_message_failed(uint64 id, Status status) { void Session::on_message_failed(uint64 message_id, Status status) {
LOG(INFO) << "Message failed: " << tag("id", id) << tag("status", status); LOG(INFO) << "Message failed: " << tag("message_id", message_id) << tag("status", status);
status.ignore(); status.ignore();
auto cit = sent_containers_.find(id); auto cit = sent_containers_.find(message_id);
if (cit != sent_containers_.end()) { if (cit != sent_containers_.end()) {
auto container_info = std::move(cit->second); auto container_info = std::move(cit->second);
sent_containers_.erase(cit); sent_containers_.erase(cit);
for (auto message_id : container_info.message_ids) { for (auto contained_message_id : container_info.message_ids) {
on_message_failed_inner(message_id, true); on_message_failed_inner(contained_message_id, true);
} }
return; return;
} }
on_message_failed_inner(id, false); on_message_failed_inner(message_id, false);
} }
void Session::on_message_info(uint64 id, int32 state, uint64 answer_id, int32 answer_size) { void Session::on_message_info(uint64 message_id, int32 state, uint64 answer_message_id, int32 answer_size) {
auto it = sent_queries_.find(id); auto it = sent_queries_.find(message_id);
if (it != sent_queries_.end()) { if (it != sent_queries_.end()) {
if (it->second.query->update_is_ready()) { if (it->second.query->update_is_ready()) {
dec_container(it->first, &it->second); dec_container(it->first, &it->second);
@ -1054,7 +1054,7 @@ void Session::on_message_info(uint64 id, int32 state, uint64 answer_id, int32 an
return; return;
} }
} }
if (id != 0) { if (message_id != 0) {
if (it == sent_queries_.end()) { if (it == sent_queries_.end()) {
return; return;
} }
@ -1063,30 +1063,31 @@ void Session::on_message_info(uint64 id, int32 state, uint64 answer_id, int32 an
case 2: case 2:
case 3: case 3:
// message not received by server // message not received by server
return on_message_failed(id, Status::Error("Unknown message identifier")); return on_message_failed(message_id, Status::Error("Unknown message identifier"));
case 0: case 0:
if (answer_id == 0) { if (answer_message_id == 0) {
LOG(ERROR) << "Unexpected message_info.state == 0 " << tag("id", id) << tag("state", state) LOG(ERROR) << "Unexpected message_info.state == 0 " << tag("message_id", message_id) << tag("state", state)
<< tag("answer_id", answer_id); << tag("answer_message_id", answer_message_id);
return on_message_failed(id, Status::Error("Unexpected message_info.state == 0")); return on_message_failed(message_id, Status::Error("Unexpected message_info.state == 0"));
} }
// fallthrough // fallthrough
case 4: case 4:
on_message_ack_impl(id, 2); on_message_ack_impl(message_id, 2);
break; break;
default: default:
LOG(ERROR) << "Invalid message info " << tag("state", state); LOG(ERROR) << "Invalid message info " << tag("state", state);
} }
} }
// ok, we are waiting for result of id. let's ask to resend it // ok, we are waiting for result of message_id. let's ask to resend it
if (answer_id != 0) { if (answer_message_id != 0) {
if (it != sent_queries_.end()) { if (it != sent_queries_.end()) {
VLOG_IF(net_query, id != 0) << "Resend answer " << tag("msg_id", id) << tag("answer_id", answer_id) VLOG_IF(net_query, message_id != 0)
<< tag("answer_size", answer_size) << it->second.query; << "Resend answer " << tag("message_id", message_id) << tag("answer_message_id", answer_message_id)
<< tag("answer_size", answer_size) << it->second.query;
it->second.query->debug("Session: resend answer"); it->second.query->debug("Session: resend answer");
} }
current_info_->connection_->resend_answer(answer_id); current_info_->connection_->resend_answer(answer_message_id);
} }
} }
@ -1160,9 +1161,10 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer
message_id = auth_data_.next_message_id(Time::now_cached()); message_id = auth_data_.next_message_id(Time::now_cached());
} }
} }
VLOG(net_query) << "Send query to connection " << net_query << " [msg_id:" << format::as_hex(message_id) << "]" VLOG(net_query) << "Send query to connection " << net_query << " [message_id:" << format::as_hex(message_id) << "]"
<< tag("invoke_after", << tag("invoke_after", transform(invoke_after_ids, [](auto message_id) {
transform(invoke_after_ids, [](auto id) { return PSTRING() << format::as_hex(id); })); return PSTRING() << format::as_hex(message_id);
}));
net_query->set_message_id(message_id); net_query->set_message_id(message_id);
net_query->cancel_slot_.clear_event(); net_query->cancel_slot_.clear_event();
{ {
@ -1306,11 +1308,11 @@ void Session::connection_open_finish(ConnectionInfo *info,
return; return;
} }
if (info->ask_info_) { if (info->ask_info_) {
for (auto &id : unknown_queries_) { for (auto &message_id : unknown_queries_) {
info->connection_->get_state_info(id); info->connection_->get_state_info(message_id);
} }
for (auto &id : to_cancel_) { for (auto &message_id : to_cancel_) {
info->connection_->cancel_answer(id); info->connection_->cancel_answer(message_id);
} }
to_cancel_.clear(); to_cancel_.clear();
} }

View File

@ -78,7 +78,7 @@ class Session final
private: private:
struct Query final : private ListNode { struct Query final : private ListNode {
uint64 container_id; uint64 container_message_id;
NetQueryPtr query; NetQueryPtr query;
bool ack = false; bool ack = false;
@ -87,7 +87,7 @@ class Session final
int8 connection_id; int8 connection_id;
double sent_at_; double sent_at_;
Query(uint64 message_id, NetQueryPtr &&q, int8 connection_id, double sent_at) Query(uint64 message_id, NetQueryPtr &&q, int8 connection_id, double sent_at)
: container_id(message_id), query(std::move(q)), connection_id(connection_id), sent_at_(sent_at) { : container_message_id(message_id), query(std::move(q)), connection_id(connection_id), sent_at_(sent_at) {
} }
ListNode *get_list_node() { ListNode *get_list_node() {
@ -101,7 +101,7 @@ class Session final
// When connection is closed, mark all queries without ack as unknown. // When connection is closed, mark all queries without ack as unknown.
// Ask state of all unknown queries when new connection is created. // Ask state of all unknown queries when new connection is created.
// //
// Just re-ask answer_id each time we get information about it. // Just re-ask answer_message_id each time we get information about it.
// Though mtproto::Connection must ensure delivery of such query. // Though mtproto::Connection must ensure delivery of such query.
const int32 raw_dc_id_; // numerical datacenter ID, i.e. 2 const int32 raw_dc_id_; // numerical datacenter ID, i.e. 2
@ -209,33 +209,33 @@ class Session final
void on_server_salt_updated() final; void on_server_salt_updated() final;
void on_server_time_difference_updated() final; void on_server_time_difference_updated() final;
void on_session_created(uint64 unique_id, uint64 first_id) final; void on_session_created(uint64 unique_id, uint64 first_message_id) final;
void on_session_failed(Status status) final; void on_session_failed(Status status) final;
void on_container_sent(uint64 container_id, vector<uint64> msg_ids) final; void on_container_sent(uint64 container_message_id, vector<uint64> message_ids) final;
Status on_update(BufferSlice packet) final; Status on_update(BufferSlice packet) final;
void on_message_ack(uint64 id) final; void on_message_ack(uint64 message_id) final;
Status on_message_result_ok(uint64 id, BufferSlice packet, size_t original_size) final; Status on_message_result_ok(uint64 message_id, BufferSlice packet, size_t original_size) final;
void on_message_result_error(uint64 id, int error_code, string message) final; void on_message_result_error(uint64 message_id, int error_code, string message) final;
void on_message_failed(uint64 id, Status status) final; void on_message_failed(uint64 message_id, Status status) final;
void on_message_info(uint64 id, int32 state, uint64 answer_id, int32 answer_size) final; void on_message_info(uint64 message_id, int32 state, uint64 answer_message_id, int32 answer_size) final;
Status on_destroy_auth_key() final; Status on_destroy_auth_key() final;
void flush_pending_invoke_after_queries(); void flush_pending_invoke_after_queries();
bool has_queries() const; bool has_queries() const;
void dec_container(uint64 message_id, Query *query); void dec_container(uint64 container_message_id, Query *query);
void cleanup_container(uint64 id, Query *query); void cleanup_container(uint64 container_message_id, Query *query);
void mark_as_known(uint64 id, Query *query); void mark_as_known(uint64 message_id, Query *query);
void mark_as_unknown(uint64 id, Query *query); void mark_as_unknown(uint64 message_id, Query *query);
void on_message_ack_impl(uint64 id, int32 type); void on_message_ack_impl(uint64 container_message_id, int32 type);
void on_message_ack_impl_inner(uint64 id, int32 type, bool in_container); void on_message_ack_impl_inner(uint64 message_id, int32 type, bool in_container);
void on_message_failed_inner(uint64 id, bool in_container); void on_message_failed_inner(uint64 message_id, bool in_container);
// send NetQueryPtr to parent // send NetQueryPtr to parent
void return_query(NetQueryPtr &&query); void return_query(NetQueryPtr &&query);