diff --git a/td/mtproto/CryptoStorer.h b/td/mtproto/CryptoStorer.h index 94faaf1bf..eaadd523f 100644 --- a/td/mtproto/CryptoStorer.h +++ b/td/mtproto/CryptoStorer.h @@ -206,8 +206,8 @@ class CryptoImpl { CryptoImpl(const vector &to_send, Slice header, vector &&to_ack, int64 ping_id, int ping_timeout, int max_delay, int max_after, int max_wait, int future_salt_n, vector get_info, vector resend, const vector &cancel, bool destroy_key, AuthData *auth_data, - uint64 *container_id, uint64 *get_info_message_id, uint64 *resend_message_id, uint64 *ping_message_id, - uint64 *parent_message_id) + uint64 *container_message_id, uint64 *get_info_message_id, uint64 *resend_message_id, + uint64 *ping_message_id, uint64 *parent_message_id) : query_storer_(to_send, header) , ack_empty_(to_ack.empty()) , ack_storer_(!ack_empty_, mtproto_api::msgs_ack(std::move(to_ack)), auth_data) @@ -251,7 +251,7 @@ class CryptoImpl { message_id_ = auth_data->next_message_id(Time::now_cached()); seq_no_ = auth_data->next_seq_no(false); - *container_id = message_id_; + *container_message_id = message_id_; *parent_message_id = message_id_; } else if (!to_send.empty()) { CHECK(to_send.size() == 1u); diff --git a/td/mtproto/PingConnection.cpp b/td/mtproto/PingConnection.cpp index 3e801144d..4c7878f5d 100644 --- a/td/mtproto/PingConnection.cpp +++ b/td/mtproto/PingConnection.cpp @@ -55,9 +55,11 @@ class PingConnectionReqPQ final } return raw_connection_->flush(AuthKey(), *this); } + bool was_pong() const final { return finish_time_ > 0; } + double rtt() const final { return finish_time_ - start_time_; } @@ -105,8 +107,10 @@ class PingConnectionPingPong final double rtt_; bool is_closed_{false}; Status status_; + void on_connected() final { } + void on_closed(Status status) final { is_closed_ = true; CHECK(status.is_error()); @@ -115,20 +119,25 @@ class PingConnectionPingPong final void on_auth_key_updated() final { } + void on_tmp_auth_key_updated() final { } + void on_server_salt_updated() final { } + void on_server_time_difference_updated(bool force) final { } void on_new_session_created(uint64 unique_id, uint64 first_message_id) final { } + void on_session_failed(Status status) final { } - void on_container_sent(uint64 container_id, vector msgs_id) final { + void on_container_sent(uint64 container_message_id, vector message_ids) final { } + Status on_pong() final { pong_cnt_++; if (pong_cnt_ == 1) { @@ -143,16 +152,21 @@ class PingConnectionPingPong final Status on_update(BufferSlice packet) final { return Status::OK(); } + void on_message_ack(uint64 id) final { } + Status on_message_result_ok(uint64 id, BufferSlice packet, size_t original_size) final { LOG(ERROR) << "Unexpected message"; return Status::OK(); } + void on_message_result_error(uint64 id, int code, string message) final { } + void on_message_failed(uint64 id, Status status) final { } + void on_message_info(uint64 id, int32 state, uint64 answer_id, int32 answer_size, int32 source) final { } @@ -160,12 +174,15 @@ class PingConnectionPingPong final LOG(ERROR) << "Destroy auth key"; return Status::OK(); } + PollableFdInfo &get_poll_info() final { return connection_->get_poll_info(); } + unique_ptr move_as_raw_connection() final { return connection_->move_as_raw_connection(); } + Status flush() final { if (was_pong()) { return Status::OK(); @@ -178,9 +195,11 @@ class PingConnectionPingPong final } return Status::OK(); } + bool was_pong() const final { return pong_cnt_ >= 2; } + double rtt() const final { return rtt_; } diff --git a/td/mtproto/SessionConnection.cpp b/td/mtproto/SessionConnection.cpp index eaa3d4417..a433cb8ea 100644 --- a/td/mtproto/SessionConnection.cpp +++ b/td/mtproto/SessionConnection.cpp @@ -212,10 +212,10 @@ Status SessionConnection::parse_message(TlParser &parser, MsgInfo *info, Slice * } Status SessionConnection::on_packet_container(const MsgInfo &info, Slice packet) { - auto old_container_id = container_id_; - container_id_ = info.message_id; + auto old_container_message_id = container_message_id_; + container_message_id_ = info.message_id; SCOPE_EXIT { - container_id_ = old_container_id; + container_message_id_ = old_container_message_id; }; TlParser parser(packet); @@ -223,7 +223,7 @@ Status SessionConnection::on_packet_container(const MsgInfo &info, Slice packet) if (parser.get_error()) { return Status::Error(PSLICE() << "Failed to parse mtproto_api::rpc_container: " << parser.get_error()); } - VLOG(mtproto) << "Receive container " << format::as_hex(container_id_) << " of size " << size; + VLOG(mtproto) << "Receive container " << format::as_hex(container_message_id_) << " of size " << size; for (int i = 0; i < size; i++) { TRY_STATUS(parse_packet(parser)); } @@ -516,8 +516,8 @@ Status SessionConnection::on_slice_packet(const MsgInfo &info, Slice packet) { auto get_update_description = [&] { return PSTRING() << "update from " << get_name() << " with auth key " << auth_data_->get_auth_key().id() - << " active for " << (Time::now() - created_at_) << " seconds in container " << container_id_ - << " from session " << auth_data_->get_session_id() << " with " << info + << " active for " << (Time::now() - created_at_) << " seconds in container " + << container_message_id_ << " from session " << auth_data_->get_session_id() << " with " << info << ", main_message_id = " << format::as_hex(main_message_id_) << " and original size = " << info.size; }; @@ -581,11 +581,11 @@ void SessionConnection::on_message_failed(uint64 id, Status status) { sent_destroy_auth_key_ = false; - if (id == last_ping_message_id_ || id == last_ping_container_id_) { + if (id == last_ping_message_id_ || id == last_ping_container_message_id_) { // restart ping immediately last_ping_at_ = 0; last_ping_message_id_ = 0; - last_ping_container_id_ = 0; + last_ping_container_message_id_ = 0; } auto cit = container_to_service_msg_.find(id); @@ -774,7 +774,7 @@ void SessionConnection::set_online(bool online_flag, bool is_main) { } last_ping_at_ = 0; last_ping_message_id_ = 0; - last_ping_container_id_ = 0; + last_ping_container_message_id_ = 0; } void SessionConnection::do_close(Status status) { @@ -893,7 +893,7 @@ bool SessionConnection::must_ping() const { void SessionConnection::flush_packet() { bool has_salt = auth_data_->has_salt(Time::now_cached()); // ping - uint64 container_id = 0; + uint64 container_message_id = 0; int64 ping_id = 0; if (has_salt && may_ping()) { ping_id = ++cur_ping_id_; @@ -992,7 +992,7 @@ void SessionConnection::flush_packet() { auto storer = PacketStorer( queries, auth_data_->get_header(), std::move(to_ack), ping_id, static_cast(ping_disconnect_delay() + 2.0), max_delay, max_after, max_wait, future_salt_n, to_get_state_info, to_resend_answer, to_cancel_answer, - destroy_auth_key, auth_data_, &container_id, &get_state_info_message_id, &resend_answer_message_id, + destroy_auth_key, auth_data_, &container_message_id, &get_state_info_message_id, &resend_answer_message_id, &ping_message_id, &parent_message_id); auto quick_ack_token = use_quick_ack ? parent_message_id : 0; @@ -1000,19 +1000,19 @@ void SessionConnection::flush_packet() { } if (resend_answer_message_id) { - service_queries_.emplace(resend_answer_message_id, - ServiceQuery{ServiceQuery::ResendAnswer, container_id, std::move(to_resend_answer)}); + service_queries_.emplace(resend_answer_message_id, ServiceQuery{ServiceQuery::ResendAnswer, container_message_id, + std::move(to_resend_answer)}); } if (get_state_info_message_id) { - service_queries_.emplace(get_state_info_message_id, - ServiceQuery{ServiceQuery::GetStateInfo, container_id, std::move(to_get_state_info)}); + service_queries_.emplace(get_state_info_message_id, ServiceQuery{ServiceQuery::GetStateInfo, container_message_id, + std::move(to_get_state_info)}); } if (ping_id != 0) { - last_ping_container_id_ = container_id; + last_ping_container_message_id_ = container_message_id; last_ping_message_id_ = ping_message_id; } - if (container_id != 0) { + if (container_message_id != 0) { auto message_ids = transform(queries, [](const MtprotoQuery &x) { return static_cast(x.message_id); }); // some acks may be lost here. Nobody will resend them if something goes wrong with query. @@ -1020,13 +1020,13 @@ void SessionConnection::flush_packet() { // // get future salt too. // So I will re-ask salt if have no answer in 60 second. - callback_->on_container_sent(container_id, std::move(message_ids)); + callback_->on_container_sent(container_message_id, std::move(message_ids)); if (resend_answer_message_id) { - container_to_service_msg_[container_id].push_back(resend_answer_message_id); + container_to_service_msg_[container_message_id].push_back(resend_answer_message_id); } if (get_state_info_message_id) { - container_to_service_msg_[container_id].push_back(get_state_info_message_id); + container_to_service_msg_[container_message_id].push_back(get_state_info_message_id); } } diff --git a/td/mtproto/SessionConnection.h b/td/mtproto/SessionConnection.h index 474008f60..d6263752a 100644 --- a/td/mtproto/SessionConnection.h +++ b/td/mtproto/SessionConnection.h @@ -98,7 +98,7 @@ class SessionConnection final virtual void on_new_session_created(uint64 unique_id, uint64 first_message_id) = 0; virtual void on_session_failed(Status status) = 0; - virtual void on_container_sent(uint64 container_id, vector msgs_id) = 0; + virtual void on_container_sent(uint64 container_message_id, vector message_ids) = 0; virtual Status on_pong() = 0; virtual Status on_update(BufferSlice packet) = 0; @@ -185,7 +185,7 @@ class SessionConnection final double real_last_pong_at_ = 0; uint64 cur_ping_id_ = 0; uint64 last_ping_message_id_ = 0; - uint64 last_ping_container_id_ = 0; + uint64 last_ping_container_message_id_ = 0; uint64 last_read_size_ = 0; uint64 last_write_size_ = 0; @@ -200,7 +200,7 @@ class SessionConnection final Mode mode_; bool connected_flag_ = false; - uint64 container_id_ = 0; + uint64 container_message_id_ = 0; uint64 main_message_id_ = 0; double created_at_ = 0;