diff --git a/td/telegram/net/Session.cpp b/td/telegram/net/Session.cpp index 04371800e..b24347d4d 100644 --- a/td/telegram/net/Session.cpp +++ b/td/telegram/net/Session.cpp @@ -693,9 +693,9 @@ void Session::on_closed(Status status) { current_info_->state_ = ConnectionInfo::State::Empty; } -void Session::on_session_created(uint64 unique_id, uint64 first_id) { +void Session::on_session_created(uint64 unique_id, uint64 first_message_id) { // TODO: use unique_id - LOG(INFO) << "New session " << unique_id << " created with first message_id " << first_id; + LOG(INFO) << "New session " << unique_id << " created with first message_id " << first_message_id; if (!use_pfs_ && !auth_data_.use_pfs()) { last_success_timestamp_ = Time::now(); } @@ -709,7 +709,7 @@ void Session::on_session_created(uint64 unique_id, uint64 first_id) { for (auto it = sent_queries_.begin(); it != sent_queries_.end();) { Query *query_ptr = &it->second; - if (query_ptr->container_id < first_id) { + if (query_ptr->container_message_id < first_message_id) { // container vector leak otherwise cleanup_container(it->first, &it->second); mark_as_known(it->first, &it->second); @@ -737,30 +737,30 @@ void Session::on_session_failed(Status status) { callback_->on_failed(); } -void Session::on_container_sent(uint64 container_id, vector msg_ids) { - CHECK(container_id != 0); +void Session::on_container_sent(uint64 container_message_id, vector message_ids) { + CHECK(container_message_id != 0); - td::remove_if(msg_ids, [&](uint64 msg_id) { - auto it = sent_queries_.find(msg_id); + td::remove_if(message_ids, [&](uint64 message_id) { + auto it = sent_queries_.find(message_id); if (it == sent_queries_.end()) { return true; // remove } - it->second.container_id = container_id; + it->second.container_message_id = container_message_id; return false; }); - if (msg_ids.empty()) { + if (message_ids.empty()) { return; } - auto size = msg_ids.size(); - sent_containers_.emplace(container_id, ContainerInfo{size, std::move(msg_ids)}); + auto size = message_ids.size(); + sent_containers_.emplace(container_message_id, ContainerInfo{size, std::move(message_ids)}); } -void Session::on_message_ack(uint64 id) { - on_message_ack_impl(id, 1); +void Session::on_message_ack(uint64 message_id) { + on_message_ack_impl(message_id, 1); } -void Session::on_message_ack_impl(uint64 id, int32 type) { - auto cit = sent_containers_.find(id); +void Session::on_message_ack_impl(uint64 container_message_id, int32 type) { + auto cit = sent_containers_.find(container_message_id); if (cit != sent_containers_.end()) { auto container_info = std::move(cit->second); sent_containers_.erase(cit); @@ -771,15 +771,15 @@ void Session::on_message_ack_impl(uint64 id, int32 type) { return; } - on_message_ack_impl_inner(id, type, false); + on_message_ack_impl_inner(container_message_id, type, false); } -void Session::on_message_ack_impl_inner(uint64 id, int32 type, bool in_container) { - auto it = sent_queries_.find(id); +void Session::on_message_ack_impl_inner(uint64 message_id, int32 type, bool in_container) { + auto it = sent_queries_.find(message_id); if (it == sent_queries_.end()) { return; } - VLOG(net_query) << "Ack " << tag("msg_id", id) << it->second.query; + VLOG(net_query) << "Ack " << tag("message_id", message_id) << it->second.query; it->second.ack = true; { auto lock = it->second.query->lock(); @@ -787,17 +787,17 @@ void Session::on_message_ack_impl_inner(uint64 id, int32 type, bool in_container } it->second.query->quick_ack_promise_.set_value(Unit()); if (!in_container) { - cleanup_container(id, &it->second); + cleanup_container(message_id, &it->second); } mark_as_known(it->first, &it->second); } -void Session::dec_container(uint64 message_id, Query *query) { - if (query->container_id == message_id) { +void Session::dec_container(uint64 container_message_id, Query *query) { + if (query->container_message_id == container_message_id) { // message was sent without any container return; } - auto it = sent_containers_.find(query->container_id); + auto it = sent_containers_.find(query->container_message_id); if (it == sent_containers_.end()) { return; } @@ -808,18 +808,18 @@ void Session::dec_container(uint64 message_id, Query *query) { } } -void Session::cleanup_container(uint64 message_id, Query *query) { - if (query->container_id == message_id) { +void Session::cleanup_container(uint64 container_message_id, Query *query) { + if (query->container_message_id == container_message_id) { // message was sent without any container return; } // we can forget container now, since we have an answer for its part. // TODO: we can do it only for one element per container - sent_containers_.erase(query->container_id); + sent_containers_.erase(query->container_message_id); } -void Session::mark_as_known(uint64 id, Query *query) { +void Session::mark_as_known(uint64 message_id, Query *query) { { auto lock = query->query->lock(); query->query->get_data_unsafe().unknown_state_ = false; @@ -827,15 +827,15 @@ void Session::mark_as_known(uint64 id, Query *query) { if (!query->unknown) { return; } - VLOG(net_query) << "Mark as known " << tag("msg_id", id) << query->query; + VLOG(net_query) << "Mark as known " << tag("message_id", message_id) << query->query; query->unknown = false; - unknown_queries_.erase(id); + unknown_queries_.erase(message_id); if (unknown_queries_.empty()) { flush_pending_invoke_after_queries(); } } -void Session::mark_as_unknown(uint64 id, Query *query) { +void Session::mark_as_unknown(uint64 message_id, Query *query) { { auto lock = query->query->lock(); query->query->get_data_unsafe().unknown_state_ = true; @@ -843,10 +843,10 @@ void Session::mark_as_unknown(uint64 id, Query *query) { if (query->unknown) { return; } - VLOG(net_query) << "Mark as unknown " << tag("msg_id", id) << query->query; + VLOG(net_query) << "Mark as unknown " << tag("message_id", message_id) << query->query; query->unknown = true; - CHECK(id != 0); - unknown_queries_.insert(id); + CHECK(message_id != 0); + unknown_queries_.insert(message_id); } Status Session::on_update(BufferSlice packet) { @@ -862,16 +862,16 @@ Status Session::on_update(BufferSlice packet) { return Status::OK(); } -Status Session::on_message_result_ok(uint64 id, BufferSlice packet, size_t original_size) { +Status Session::on_message_result_ok(uint64 message_id, BufferSlice packet, size_t original_size) { last_success_timestamp_ = Time::now(); TlParser parser(packet.as_slice()); int32 ID = parser.fetch_int(); - auto it = sent_queries_.find(id); + auto it = sent_queries_.find(message_id); if (it == sent_queries_.end()) { - LOG(DEBUG) << "Drop result to " << tag("request_id", format::as_hex(id)) << tag("original_size", original_size) - << tag("tl", format::as_hex(ID)); + LOG(DEBUG) << "Drop result to " << tag("message_id", format::as_hex(message_id)) + << tag("original_size", original_size) << tag("tl", format::as_hex(ID)); if (original_size > 16 * 1024) { dropped_size_ += original_size; @@ -901,8 +901,8 @@ Status Session::on_message_result_ok(uint64 id, BufferSlice packet, size_t origi } } - cleanup_container(id, query_ptr); - mark_as_known(id, query_ptr); + cleanup_container(message_id, query_ptr); + mark_as_known(message_id, query_ptr); query_ptr->query->on_net_read(original_size); query_ptr->query->set_ok(std::move(packet)); query_ptr->query->set_message_id(0); @@ -913,7 +913,7 @@ Status Session::on_message_result_ok(uint64 id, BufferSlice packet, size_t origi return Status::OK(); } -void Session::on_message_result_error(uint64 id, int error_code, string message) { +void Session::on_message_result_error(uint64 message_id, int error_code, string message) { if (!check_utf8(message)) { LOG(ERROR) << "Receive invalid error message \"" << message << '"'; message = "INVALID_UTF8_ERROR_MESSAGE"; @@ -970,7 +970,7 @@ void Session::on_message_result_error(uint64 id, int error_code, string message) error_code = 500; } - if (id == 0) { + if (message_id == 0) { LOG(ERROR) << "Received an error update"; return; } @@ -982,7 +982,7 @@ void Session::on_message_result_error(uint64 id, int error_code, string message) } else { LOG(DEBUG) << "Receive error " << error_code << " : " << message; } - auto it = sent_queries_.find(id); + auto it = sent_queries_.find(message_id); if (it == sent_queries_.end()) { current_info_->connection_->force_ack(); return; @@ -991,8 +991,8 @@ void Session::on_message_result_error(uint64 id, int error_code, string message) Query *query_ptr = &it->second; VLOG(net_query) << "Return query error " << query_ptr->query; - cleanup_container(id, query_ptr); - mark_as_known(id, query_ptr); + cleanup_container(message_id, query_ptr); + mark_as_known(message_id, query_ptr); query_ptr->query->set_error(Status::Error(error_code, message), current_info_->connection_->get_name().str()); query_ptr->query->set_message_id(0); query_ptr->query->cancel_slot_.clear_event(); @@ -1001,18 +1001,18 @@ void Session::on_message_result_error(uint64 id, int error_code, string message) sent_queries_.erase(it); } -void Session::on_message_failed_inner(uint64 id, bool in_container) { - LOG(INFO) << "Message inner failed " << id; - auto it = sent_queries_.find(id); +void Session::on_message_failed_inner(uint64 message_id, bool in_container) { + LOG(INFO) << "Message inner failed " << message_id; + auto it = sent_queries_.find(message_id); if (it == sent_queries_.end()) { return; } Query *query_ptr = &it->second; if (!in_container) { - cleanup_container(id, query_ptr); + cleanup_container(message_id, query_ptr); } - mark_as_known(id, query_ptr); + mark_as_known(message_id, query_ptr); query_ptr->query->set_message_id(0); query_ptr->query->cancel_slot_.clear_event(); @@ -1021,26 +1021,26 @@ void Session::on_message_failed_inner(uint64 id, bool in_container) { sent_queries_.erase(it); } -void Session::on_message_failed(uint64 id, Status status) { - LOG(INFO) << "Message failed: " << tag("id", id) << tag("status", status); +void Session::on_message_failed(uint64 message_id, Status status) { + LOG(INFO) << "Message failed: " << tag("message_id", message_id) << tag("status", status); status.ignore(); - auto cit = sent_containers_.find(id); + auto cit = sent_containers_.find(message_id); if (cit != sent_containers_.end()) { auto container_info = std::move(cit->second); sent_containers_.erase(cit); - for (auto message_id : container_info.message_ids) { - on_message_failed_inner(message_id, true); + for (auto contained_message_id : container_info.message_ids) { + on_message_failed_inner(contained_message_id, true); } return; } - on_message_failed_inner(id, false); + on_message_failed_inner(message_id, false); } -void Session::on_message_info(uint64 id, int32 state, uint64 answer_id, int32 answer_size) { - auto it = sent_queries_.find(id); +void Session::on_message_info(uint64 message_id, int32 state, uint64 answer_message_id, int32 answer_size) { + auto it = sent_queries_.find(message_id); if (it != sent_queries_.end()) { if (it->second.query->update_is_ready()) { dec_container(it->first, &it->second); @@ -1054,7 +1054,7 @@ void Session::on_message_info(uint64 id, int32 state, uint64 answer_id, int32 an return; } } - if (id != 0) { + if (message_id != 0) { if (it == sent_queries_.end()) { return; } @@ -1063,30 +1063,31 @@ void Session::on_message_info(uint64 id, int32 state, uint64 answer_id, int32 an case 2: case 3: // message not received by server - return on_message_failed(id, Status::Error("Unknown message identifier")); + return on_message_failed(message_id, Status::Error("Unknown message identifier")); case 0: - if (answer_id == 0) { - LOG(ERROR) << "Unexpected message_info.state == 0 " << tag("id", id) << tag("state", state) - << tag("answer_id", answer_id); - return on_message_failed(id, Status::Error("Unexpected message_info.state == 0")); + if (answer_message_id == 0) { + LOG(ERROR) << "Unexpected message_info.state == 0 " << tag("message_id", message_id) << tag("state", state) + << tag("answer_message_id", answer_message_id); + return on_message_failed(message_id, Status::Error("Unexpected message_info.state == 0")); } // fallthrough case 4: - on_message_ack_impl(id, 2); + on_message_ack_impl(message_id, 2); break; default: LOG(ERROR) << "Invalid message info " << tag("state", state); } } - // ok, we are waiting for result of id. let's ask to resend it - if (answer_id != 0) { + // ok, we are waiting for result of message_id. let's ask to resend it + if (answer_message_id != 0) { if (it != sent_queries_.end()) { - VLOG_IF(net_query, id != 0) << "Resend answer " << tag("msg_id", id) << tag("answer_id", answer_id) - << tag("answer_size", answer_size) << it->second.query; + VLOG_IF(net_query, message_id != 0) + << "Resend answer " << tag("message_id", message_id) << tag("answer_message_id", answer_message_id) + << tag("answer_size", answer_size) << it->second.query; it->second.query->debug("Session: resend answer"); } - current_info_->connection_->resend_answer(answer_id); + current_info_->connection_->resend_answer(answer_message_id); } } @@ -1160,9 +1161,10 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer message_id = auth_data_.next_message_id(Time::now_cached()); } } - VLOG(net_query) << "Send query to connection " << net_query << " [msg_id:" << format::as_hex(message_id) << "]" - << tag("invoke_after", - transform(invoke_after_ids, [](auto id) { return PSTRING() << format::as_hex(id); })); + VLOG(net_query) << "Send query to connection " << net_query << " [message_id:" << format::as_hex(message_id) << "]" + << tag("invoke_after", transform(invoke_after_ids, [](auto message_id) { + return PSTRING() << format::as_hex(message_id); + })); net_query->set_message_id(message_id); net_query->cancel_slot_.clear_event(); { @@ -1306,11 +1308,11 @@ void Session::connection_open_finish(ConnectionInfo *info, return; } if (info->ask_info_) { - for (auto &id : unknown_queries_) { - info->connection_->get_state_info(id); + for (auto &message_id : unknown_queries_) { + info->connection_->get_state_info(message_id); } - for (auto &id : to_cancel_) { - info->connection_->cancel_answer(id); + for (auto &message_id : to_cancel_) { + info->connection_->cancel_answer(message_id); } to_cancel_.clear(); } diff --git a/td/telegram/net/Session.h b/td/telegram/net/Session.h index 02d1d4dee..ee23102b8 100644 --- a/td/telegram/net/Session.h +++ b/td/telegram/net/Session.h @@ -78,7 +78,7 @@ class Session final private: struct Query final : private ListNode { - uint64 container_id; + uint64 container_message_id; NetQueryPtr query; bool ack = false; @@ -87,7 +87,7 @@ class Session final int8 connection_id; double sent_at_; Query(uint64 message_id, NetQueryPtr &&q, int8 connection_id, double sent_at) - : container_id(message_id), query(std::move(q)), connection_id(connection_id), sent_at_(sent_at) { + : container_message_id(message_id), query(std::move(q)), connection_id(connection_id), sent_at_(sent_at) { } ListNode *get_list_node() { @@ -101,7 +101,7 @@ class Session final // When connection is closed, mark all queries without ack as unknown. // Ask state of all unknown queries when new connection is created. // - // Just re-ask answer_id each time we get information about it. + // Just re-ask answer_message_id each time we get information about it. // Though mtproto::Connection must ensure delivery of such query. const int32 raw_dc_id_; // numerical datacenter ID, i.e. 2 @@ -209,33 +209,33 @@ class Session final void on_server_salt_updated() final; void on_server_time_difference_updated() final; - void on_session_created(uint64 unique_id, uint64 first_id) final; + void on_session_created(uint64 unique_id, uint64 first_message_id) final; void on_session_failed(Status status) final; - void on_container_sent(uint64 container_id, vector msg_ids) final; + void on_container_sent(uint64 container_message_id, vector message_ids) final; Status on_update(BufferSlice packet) final; - void on_message_ack(uint64 id) final; - Status on_message_result_ok(uint64 id, BufferSlice packet, size_t original_size) final; - void on_message_result_error(uint64 id, int error_code, string message) final; - void on_message_failed(uint64 id, Status status) final; + void on_message_ack(uint64 message_id) final; + Status on_message_result_ok(uint64 message_id, BufferSlice packet, size_t original_size) final; + void on_message_result_error(uint64 message_id, int error_code, string message) final; + void on_message_failed(uint64 message_id, Status status) final; - void on_message_info(uint64 id, int32 state, uint64 answer_id, int32 answer_size) final; + void on_message_info(uint64 message_id, int32 state, uint64 answer_message_id, int32 answer_size) final; Status on_destroy_auth_key() final; void flush_pending_invoke_after_queries(); bool has_queries() const; - void dec_container(uint64 message_id, Query *query); - void cleanup_container(uint64 id, Query *query); - void mark_as_known(uint64 id, Query *query); - void mark_as_unknown(uint64 id, Query *query); + void dec_container(uint64 container_message_id, Query *query); + void cleanup_container(uint64 container_message_id, Query *query); + void mark_as_known(uint64 message_id, Query *query); + void mark_as_unknown(uint64 message_id, Query *query); - void on_message_ack_impl(uint64 id, int32 type); - void on_message_ack_impl_inner(uint64 id, int32 type, bool in_container); - void on_message_failed_inner(uint64 id, bool in_container); + void on_message_ack_impl(uint64 container_message_id, int32 type); + void on_message_ack_impl_inner(uint64 message_id, int32 type, bool in_container); + void on_message_failed_inner(uint64 message_id, bool in_container); // send NetQueryPtr to parent void return_query(NetQueryPtr &&query);