Initialize ping_storer_ after other storers to ensure increasing message_id order in the container.

This commit is contained in:
levlam 2023-09-03 01:47:50 +03:00
parent fc98363d95
commit f1a064c249
2 changed files with 24 additions and 22 deletions

View File

@ -206,12 +206,11 @@ class CryptoImpl {
CryptoImpl(const vector<MtprotoQuery> &to_send, Slice header, vector<int64> &&to_ack, int64 ping_id, int ping_timeout, CryptoImpl(const vector<MtprotoQuery> &to_send, Slice header, vector<int64> &&to_ack, int64 ping_id, int ping_timeout,
int max_delay, int max_after, int max_wait, int future_salt_n, vector<int64> get_info, int max_delay, int max_after, int max_wait, int future_salt_n, vector<int64> get_info,
vector<int64> resend, const vector<int64> &cancel, bool destroy_key, AuthData *auth_data, vector<int64> resend, const vector<int64> &cancel, bool destroy_key, AuthData *auth_data,
uint64 *container_id, uint64 *get_info_id, uint64 *resend_id, uint64 *ping_message_id, uint64 *container_id, uint64 *get_info_message_id, uint64 *resend_message_id, uint64 *ping_message_id,
uint64 *parent_message_id) uint64 *parent_message_id)
: query_storer_(to_send, header) : query_storer_(to_send, header)
, ack_empty_(to_ack.empty()) , ack_empty_(to_ack.empty())
, ack_storer_(!ack_empty_, mtproto_api::msgs_ack(std::move(to_ack)), auth_data) , ack_storer_(!ack_empty_, mtproto_api::msgs_ack(std::move(to_ack)), auth_data)
, ping_storer_(ping_id != 0, mtproto_api::ping_delay_disconnect(ping_id, ping_timeout), auth_data)
, http_wait_storer_(max_delay >= 0, mtproto_api::http_wait(max_delay, max_after, max_wait), auth_data) , http_wait_storer_(max_delay >= 0, mtproto_api::http_wait(max_delay, max_after, max_wait), auth_data)
, get_future_salts_storer_(future_salt_n > 0, mtproto_api::get_future_salts(future_salt_n), auth_data) , get_future_salts_storer_(future_salt_n > 0, mtproto_api::get_future_salts(future_salt_n), auth_data)
, get_info_not_empty_(!get_info.empty()) , get_info_not_empty_(!get_info.empty())
@ -222,6 +221,7 @@ class CryptoImpl {
, cancel_cnt_(static_cast<int32>(cancel.size())) , cancel_cnt_(static_cast<int32>(cancel.size()))
, cancel_storer_(cancel_not_empty_, cancel, auth_data, true) , cancel_storer_(cancel_not_empty_, cancel, auth_data, true)
, destroy_key_storer_(destroy_key, mtproto_api::destroy_auth_key(), auth_data, true) , destroy_key_storer_(destroy_key, mtproto_api::destroy_auth_key(), auth_data, true)
, ping_storer_(ping_id != 0, mtproto_api::ping_delay_disconnect(ping_id, ping_timeout), auth_data)
, tmp_storer_(query_storer_, ack_storer_) , tmp_storer_(query_storer_, ack_storer_)
, tmp2_storer_(tmp_storer_, http_wait_storer_) , tmp2_storer_(tmp_storer_, http_wait_storer_)
, tmp3_storer_(tmp2_storer_, get_future_salts_storer_) , tmp3_storer_(tmp2_storer_, get_future_salts_storer_)
@ -235,11 +235,11 @@ class CryptoImpl {
resend_storer_.not_empty() + cancel_cnt_ + destroy_key_storer_.not_empty()) resend_storer_.not_empty() + cancel_cnt_ + destroy_key_storer_.not_empty())
, container_storer_(cnt_, concat_storer_) { , container_storer_(cnt_, concat_storer_) {
CHECK(cnt_ != 0); CHECK(cnt_ != 0);
if (get_info_storer_.not_empty() && get_info_id) { if (get_info_storer_.not_empty() && get_info_message_id) {
*get_info_id = get_info_storer_.get_message_id(); *get_info_message_id = get_info_storer_.get_message_id();
} }
if (resend_storer_.not_empty() && resend_id) { if (resend_storer_.not_empty() && resend_message_id) {
*resend_id = resend_storer_.get_message_id(); *resend_message_id = resend_storer_.get_message_id();
} }
if (ping_storer_.not_empty() && ping_message_id) { if (ping_storer_.not_empty() && ping_message_id) {
*ping_message_id = ping_storer_.get_message_id(); *ping_message_id = ping_storer_.get_message_id();
@ -328,7 +328,6 @@ class CryptoImpl {
PacketStorer<QueryVectorImpl> query_storer_; PacketStorer<QueryVectorImpl> query_storer_;
bool ack_empty_; bool ack_empty_;
PacketStorer<AckImpl> ack_storer_; PacketStorer<AckImpl> ack_storer_;
PacketStorer<PingImpl> ping_storer_;
PacketStorer<HttpWaitImpl> http_wait_storer_; PacketStorer<HttpWaitImpl> http_wait_storer_;
PacketStorer<GetFutureSaltsImpl> get_future_salts_storer_; PacketStorer<GetFutureSaltsImpl> get_future_salts_storer_;
bool get_info_not_empty_; bool get_info_not_empty_;
@ -339,6 +338,7 @@ class CryptoImpl {
int32 cancel_cnt_; int32 cancel_cnt_;
PacketStorer<CancelVectorImpl> cancel_storer_; PacketStorer<CancelVectorImpl> cancel_storer_;
PacketStorer<DestroyAuthKeyImpl> destroy_key_storer_; PacketStorer<DestroyAuthKeyImpl> destroy_key_storer_;
PacketStorer<PingImpl> ping_storer_;
ConcatStorer tmp_storer_; ConcatStorer tmp_storer_;
ConcatStorer tmp2_storer_; ConcatStorer tmp2_storer_;
ConcatStorer tmp3_storer_; ConcatStorer tmp3_storer_;

View File

@ -979,11 +979,11 @@ void SessionConnection::flush_packet() {
// no more than 8192 message identifiers per container.. // no more than 8192 message identifiers per container..
auto to_resend_answer = cut_tail(to_resend_answer_, 8192, "resend_answer"); auto to_resend_answer = cut_tail(to_resend_answer_, 8192, "resend_answer");
uint64 resend_answer_id = 0; uint64 resend_answer_message_id = 0;
CHECK(queries.size() <= 1020); CHECK(queries.size() <= 1020);
auto to_cancel_answer = cut_tail(to_cancel_answer_, 1020 - queries.size(), "cancel_answer"); auto to_cancel_answer = cut_tail(to_cancel_answer_, 1020 - queries.size(), "cancel_answer");
auto to_get_state_info = cut_tail(to_get_state_info_, 8192, "get_state_info"); auto to_get_state_info = cut_tail(to_get_state_info_, 8192, "get_state_info");
uint64 get_state_info_id = 0; uint64 get_state_info_message_id = 0;
auto to_ack = cut_tail(to_ack_, 8192, "ack"); auto to_ack = cut_tail(to_ack_, 8192, "ack");
uint64 ping_message_id = 0; uint64 ping_message_id = 0;
@ -993,21 +993,23 @@ void SessionConnection::flush_packet() {
{ {
// LOG(ERROR) << (auth_data_->get_header().empty() ? '-' : '+'); // LOG(ERROR) << (auth_data_->get_header().empty() ? '-' : '+');
uint64 parent_message_id = 0; uint64 parent_message_id = 0;
auto storer = PacketStorer<CryptoImpl>(queries, auth_data_->get_header(), std::move(to_ack), ping_id, auto storer = PacketStorer<CryptoImpl>(
static_cast<int>(ping_disconnect_delay() + 2.0), max_delay, max_after, queries, auth_data_->get_header(), std::move(to_ack), ping_id, static_cast<int>(ping_disconnect_delay() + 2.0),
max_wait, future_salt_n, to_get_state_info, to_resend_answer, max_delay, max_after, max_wait, future_salt_n, to_get_state_info, to_resend_answer, to_cancel_answer,
to_cancel_answer, destroy_auth_key, auth_data_, &container_id, destroy_auth_key, auth_data_, &container_id, &get_state_info_message_id, &resend_answer_message_id,
&get_state_info_id, &resend_answer_id, &ping_message_id, &parent_message_id); &ping_message_id, &parent_message_id);
auto quick_ack_token = use_quick_ack ? parent_message_id : 0; auto quick_ack_token = use_quick_ack ? parent_message_id : 0;
send_crypto(storer, quick_ack_token); send_crypto(storer, quick_ack_token);
} }
if (resend_answer_id) { if (resend_answer_message_id) {
service_queries_.emplace(resend_answer_id, ServiceQuery{ServiceQuery::ResendAnswer, std::move(to_resend_answer)}); service_queries_.emplace(resend_answer_message_id,
ServiceQuery{ServiceQuery::ResendAnswer, std::move(to_resend_answer)});
} }
if (get_state_info_id) { if (get_state_info_message_id) {
service_queries_.emplace(get_state_info_id, ServiceQuery{ServiceQuery::GetStateInfo, std::move(to_get_state_info)}); service_queries_.emplace(get_state_info_message_id,
ServiceQuery{ServiceQuery::GetStateInfo, std::move(to_get_state_info)});
} }
if (ping_id != 0) { if (ping_id != 0) {
last_ping_container_id_ = container_id; last_ping_container_id_ = container_id;
@ -1024,11 +1026,11 @@ void SessionConnection::flush_packet() {
// So I will re-ask salt if have no answer in 60 second. // So I will re-ask salt if have no answer in 60 second.
callback_->on_container_sent(container_id, std::move(message_ids)); callback_->on_container_sent(container_id, std::move(message_ids));
if (resend_answer_id) { if (resend_answer_message_id) {
container_to_service_msg_[container_id].push_back(resend_answer_id); container_to_service_msg_[container_id].push_back(resend_answer_message_id);
} }
if (get_state_info_id) { if (get_state_info_message_id) {
container_to_service_msg_[container_id].push_back(get_state_info_id); container_to_service_msg_[container_id].push_back(get_state_info_message_id);
} }
} }