Improve SessionConnection::ServiceQuery.

This commit is contained in:
levlam 2023-09-21 16:52:21 +03:00
parent 5334cc8e10
commit e47cea5904
2 changed files with 26 additions and 25 deletions

View File

@ -304,13 +304,13 @@ Status SessionConnection::on_destroy_auth_key(const mtproto_api::DestroyAuthKeyR
} }
Status SessionConnection::on_packet(const MsgInfo &info, const mtproto_api::new_session_created &new_session_created) { Status SessionConnection::on_packet(const MsgInfo &info, const mtproto_api::new_session_created &new_session_created) {
auto first_message_id = new_session_created.first_msg_id_; auto first_message_id = static_cast<uint64>(new_session_created.first_msg_id_);
VLOG(mtproto) << "Receive new_session_created with " << info << ": [first_msg_id:" << format::as_hex(first_message_id) VLOG(mtproto) << "Receive new_session_created with " << info << ": [first_msg_id:" << format::as_hex(first_message_id)
<< "] [unique_id:" << format::as_hex(new_session_created.unique_id_) << ']'; << "] [unique_id:" << format::as_hex(new_session_created.unique_id_) << ']';
auto it = service_queries_.find(first_message_id); auto it = service_queries_.find(first_message_id);
if (it != service_queries_.end()) { if (it != service_queries_.end()) {
first_message_id = it->second.container_message_id; first_message_id = it->second.container_message_id_;
LOG(INFO) << "Update first_message_id to container's " << format::as_hex(first_message_id); LOG(INFO) << "Update first_message_id to container's " << format::as_hex(first_message_id);
} }
@ -432,32 +432,32 @@ Status SessionConnection::on_packet(const MsgInfo &info, const mtproto_api::futu
return Status::OK(); return Status::OK();
} }
Status SessionConnection::on_msgs_state_info(const vector<int64> &message_ids, Slice info) { Status SessionConnection::on_msgs_state_info(const vector<int64> &msg_ids, Slice info) {
if (message_ids.size() != info.size()) { if (msg_ids.size() != info.size()) {
return Status::Error(PSLICE() << tag("message count", message_ids.size()) return Status::Error(PSLICE() << tag("message count", msg_ids.size()) << " != " << tag("info.size()", info.size()));
<< " != " << tag("info.size()", info.size()));
} }
size_t i = 0; size_t i = 0;
for (auto message_id : message_ids) { for (auto msg_id : msg_ids) {
callback_->on_message_info(static_cast<uint64>(message_id), info[i], 0, 0, 1); callback_->on_message_info(static_cast<uint64>(msg_id), info[i], 0, 0, 1);
i++; i++;
} }
return Status::OK(); return Status::OK();
} }
Status SessionConnection::on_packet(const MsgInfo &info, const mtproto_api::msgs_state_info &msgs_state_info) { Status SessionConnection::on_packet(const MsgInfo &info, const mtproto_api::msgs_state_info &msgs_state_info) {
auto it = service_queries_.find(msgs_state_info.req_msg_id_); auto message_id = static_cast<uint64>(msgs_state_info.req_msg_id_);
auto it = service_queries_.find(message_id);
if (it == service_queries_.end()) { if (it == service_queries_.end()) {
return Status::Error("Unknown msgs_state_info"); return Status::Error("Unknown msgs_state_info");
} }
auto query = std::move(it->second); auto query = std::move(it->second);
service_queries_.erase(it); service_queries_.erase(it);
if (query.type != ServiceQuery::GetStateInfo) { if (query.type_ != ServiceQuery::GetStateInfo) {
return Status::Error("Receive msgs_state_info in response not to GetStateInfo"); return Status::Error("Receive msgs_state_info in response not to GetStateInfo");
} }
VLOG(mtproto) << "Receive msgs_state_info with " << info; VLOG(mtproto) << "Receive msgs_state_info with " << info;
return on_msgs_state_info(query.message_ids, msgs_state_info.info_); return on_msgs_state_info(query.msg_ids_, msgs_state_info.info_);
} }
Status SessionConnection::on_packet(const MsgInfo &info, const mtproto_api::msgs_all_info &msgs_all_info) { Status SessionConnection::on_packet(const MsgInfo &info, const mtproto_api::msgs_all_info &msgs_all_info) {
@ -588,8 +588,8 @@ void SessionConnection::on_message_failed(uint64 message_id, Status status) {
last_ping_container_message_id_ = 0; last_ping_container_message_id_ = 0;
} }
auto cit = container_to_service_msg_.find(message_id); auto cit = container_to_service_message_id_.find(message_id);
if (cit != container_to_service_msg_.end()) { if (cit != container_to_service_message_id_.end()) {
auto message_ids = cit->second; auto message_ids = cit->second;
for (auto inner_message_id : message_ids) { for (auto inner_message_id : message_ids) {
on_message_failed_inner(inner_message_id); on_message_failed_inner(inner_message_id);
@ -607,15 +607,15 @@ void SessionConnection::on_message_failed_inner(uint64 message_id) {
auto query = std::move(it->second); auto query = std::move(it->second);
service_queries_.erase(it); service_queries_.erase(it);
switch (query.type) { switch (query.type_) {
case ServiceQuery::ResendAnswer: case ServiceQuery::ResendAnswer:
for (auto query_message_id : query.message_ids) { for (auto msg_id : query.msg_ids_) {
resend_answer(query_message_id); resend_answer(static_cast<uint64>(msg_id));
} }
break; break;
case ServiceQuery::GetStateInfo: case ServiceQuery::GetStateInfo:
for (auto query_message_id : query.message_ids) { for (auto msg_id : query.msg_ids_) {
get_state_info(query_message_id); get_state_info(static_cast<uint64>(msg_id));
} }
break; break;
default: default:
@ -822,6 +822,7 @@ void SessionConnection::resend_answer(uint64 message_id) {
} }
to_resend_answer_message_ids_.push_back(message_id); to_resend_answer_message_ids_.push_back(message_id);
} }
void SessionConnection::cancel_answer(uint64 message_id) { void SessionConnection::cancel_answer(uint64 message_id) {
if (to_cancel_answer_message_ids_.empty()) { if (to_cancel_answer_message_ids_.empty()) {
send_before(Time::now_cached() + RESEND_ANSWER_DELAY); send_before(Time::now_cached() + RESEND_ANSWER_DELAY);
@ -1028,10 +1029,10 @@ void SessionConnection::flush_packet() {
callback_->on_container_sent(container_message_id, std::move(message_ids)); callback_->on_container_sent(container_message_id, std::move(message_ids));
if (resend_answer_message_id) { if (resend_answer_message_id) {
container_to_service_msg_[container_message_id].push_back(resend_answer_message_id); container_to_service_message_id_[container_message_id].push_back(resend_answer_message_id);
} }
if (get_state_info_message_id) { if (get_state_info_message_id) {
container_to_service_msg_[container_message_id].push_back(get_state_info_message_id); container_to_service_message_id_[container_message_id].push_back(get_state_info_message_id);
} }
} }

View File

@ -165,9 +165,9 @@ class SessionConnection final
double force_send_at_ = 0; double force_send_at_ = 0;
struct ServiceQuery { struct ServiceQuery {
enum Type { GetStateInfo, ResendAnswer } type; enum Type { GetStateInfo, ResendAnswer } type_;
uint64 container_message_id; uint64 container_message_id_;
vector<int64> message_ids; vector<int64> msg_ids_;
}; };
vector<uint64> to_resend_answer_message_ids_; vector<uint64> to_resend_answer_message_ids_;
vector<uint64> to_cancel_answer_message_ids_; vector<uint64> to_cancel_answer_message_ids_;
@ -175,7 +175,7 @@ class SessionConnection final
FlatHashMap<uint64, ServiceQuery> service_queries_; FlatHashMap<uint64, ServiceQuery> service_queries_;
// nobody cleans up this map. But it should be really small. // nobody cleans up this map. But it should be really small.
FlatHashMap<uint64, vector<uint64>> container_to_service_msg_; FlatHashMap<uint64, vector<uint64>> container_to_service_message_id_;
double random_delay_ = 0; double random_delay_ = 0;
double last_read_at_ = 0; double last_read_at_ = 0;
@ -239,7 +239,7 @@ class SessionConnection final
Status on_packet(const MsgInfo &info, const mtproto_api::pong &pong) TD_WARN_UNUSED_RESULT; Status on_packet(const MsgInfo &info, const mtproto_api::pong &pong) TD_WARN_UNUSED_RESULT;
Status on_packet(const MsgInfo &info, const mtproto_api::future_salts &salts) TD_WARN_UNUSED_RESULT; Status on_packet(const MsgInfo &info, const mtproto_api::future_salts &salts) TD_WARN_UNUSED_RESULT;
Status on_msgs_state_info(const vector<int64> &message_ids, Slice info) TD_WARN_UNUSED_RESULT; Status on_msgs_state_info(const vector<int64> &msg_ids, Slice info) TD_WARN_UNUSED_RESULT;
Status on_packet(const MsgInfo &info, const mtproto_api::msgs_state_info &msgs_state_info) TD_WARN_UNUSED_RESULT; Status on_packet(const MsgInfo &info, const mtproto_api::msgs_state_info &msgs_state_info) TD_WARN_UNUSED_RESULT;
Status on_packet(const MsgInfo &info, const mtproto_api::msgs_all_info &msgs_all_info) TD_WARN_UNUSED_RESULT; Status on_packet(const MsgInfo &info, const mtproto_api::msgs_all_info &msgs_all_info) TD_WARN_UNUSED_RESULT;
Status on_packet(const MsgInfo &info, const mtproto_api::msg_detailed_info &msg_detailed_info) TD_WARN_UNUSED_RESULT; Status on_packet(const MsgInfo &info, const mtproto_api::msg_detailed_info &msg_detailed_info) TD_WARN_UNUSED_RESULT;