diff --git a/td/telegram/net/Session.cpp b/td/telegram/net/Session.cpp index 901c8ffd7..ba0d1e573 100644 --- a/td/telegram/net/Session.cpp +++ b/td/telegram/net/Session.cpp @@ -172,8 +172,8 @@ Session::Session(unique_ptr callback, std::shared_ptr callback_ = std::shared_ptr(callback.release()); - main_connection_.connection_id = 0; - long_poll_connection_.connection_id = 1; + main_connection_.connection_id_ = 0; + long_poll_connection_.connection_id_ = 1; if (is_cdn) { auth_data_.set_header(G()->mtproto_header().get_anonymous_header().str()); @@ -243,11 +243,11 @@ void Session::connection_online_update(bool force) { } connection_online_flag_ = new_connection_online_flag; VLOG(dc) << "Set connection_online " << connection_online_flag_; - if (main_connection_.connection) { - main_connection_.connection->set_online(connection_online_flag_, is_main_); + if (main_connection_.connection_) { + main_connection_.connection_->set_online(connection_online_flag_, is_main_); } - if (long_poll_connection_.connection) { - long_poll_connection_.connection->set_online(connection_online_flag_, is_main_); + if (long_poll_connection_.connection_) { + long_poll_connection_.connection_->set_online(connection_online_flag_, is_main_); } } @@ -422,8 +422,8 @@ void Session::raw_event(const Event::Raw &event) { return_query(std::move(query)); LOG(DEBUG) << "Drop answer " << tag("message_id", format::as_hex(message_id)); - if (main_connection_.state == ConnectionInfo::State::Ready) { - main_connection_.connection->cancel_answer(message_id); + if (main_connection_.state_ == ConnectionInfo::State::Ready) { + main_connection_.connection_->cancel_answer(message_id); } else { to_cancel_.push_back(message_id); } @@ -442,11 +442,11 @@ Status Session::on_pong() { constexpr int MAX_QUERY_TIMEOUT = 60; constexpr int MIN_CONNECTION_ACTIVE = 60; if (current_info_ == &main_connection_ && - Timestamp::at(current_info_->created_at + MIN_CONNECTION_ACTIVE).is_in_past()) { + Timestamp::at(current_info_->created_at_ + MIN_CONNECTION_ACTIVE).is_in_past()) { Status status; if (!unknown_queries_.empty()) { status = Status::Error(PSLICE() << "No state info for " << unknown_queries_.size() << " queries for " - << format::as_time(Time::now_cached() - current_info_->created_at)); + << format::as_time(Time::now_cached() - current_info_->created_at_)); } if (!sent_queries_list_.empty()) { for (auto it = sent_queries_list_.prev; it != &sent_queries_list_; it = it->prev) { @@ -493,16 +493,16 @@ void Session::on_closed(Status status) { if (!close_flag_ && is_main_) { connection_token_.reset(); } - auto raw_connection = current_info_->connection->move_as_raw_connection(); + auto raw_connection = current_info_->connection_->move_as_raw_connection(); Scheduler::unsubscribe_before_close(raw_connection->get_poll_info().get_pollable_fd_ref()); raw_connection->close(); if (status.is_error()) { LOG(WARNING) << "Session with " << sent_queries_.size() << " pending requests was closed: " << status << " " - << current_info_->connection->get_name(); + << current_info_->connection_->get_name(); } else { LOG(INFO) << "Session with " << sent_queries_.size() << " pending requests was closed: " << status << " " - << current_info_->connection->get_name(); + << current_info_->connection_->get_name(); } if (status.is_error() && status.code() == -404) { @@ -535,7 +535,7 @@ void Session::on_closed(Status status) { // resend all queries without ack for (auto it = sent_queries_.begin(); it != sent_queries_.end();) { - if (!it->second.ack && it->second.connection_id == current_info_->connection_id) { + if (!it->second.ack && it->second.connection_id == current_info_->connection_id_) { // container vector leak otherwise cleanup_container(it->first, &it->second); @@ -549,7 +549,7 @@ void Session::on_closed(Status status) { query->set_message_id(0); query->cancel_slot_.clear_event(); query->set_error(Status::Error(500, PSLICE() << "Session failed: " << status.message()), - current_info_->connection->get_name().str()); + current_info_->connection_->get_name().str()); return_query(std::move(query)); it = sent_queries_.erase(it); } else { @@ -561,8 +561,8 @@ void Session::on_closed(Status status) { } } - current_info_->connection.reset(); - current_info_->state = ConnectionInfo::State::Empty; + current_info_->connection_.reset(); + current_info_->state_ = ConnectionInfo::State::Empty; } void Session::on_session_created(uint64 unique_id, uint64 first_id) { @@ -846,7 +846,7 @@ void Session::on_message_result_error(uint64 id, int error_code, string message) cleanup_container(id, query_ptr); mark_as_known(id, query_ptr); - query_ptr->query->set_error(Status::Error(error_code, message), current_info_->connection->get_name().str()); + query_ptr->query->set_error(Status::Error(error_code, message), current_info_->connection_->get_name().str()); query_ptr->query->set_message_id(0); query_ptr->query->cancel_slot_.clear_event(); return_query(std::move(query_ptr->query)); @@ -938,7 +938,7 @@ void Session::on_message_info(uint64 id, int32 state, uint64 answer_id, int32 an << tag("answer_size", answer_size) << it->second.query; it->second.query->debug("Session: resend answer"); } - current_info_->connection->resend_answer(answer_id); + current_info_->connection_->resend_answer(answer_id); } } @@ -970,7 +970,7 @@ void Session::add_query(NetQueryPtr &&net_query) { void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_query, uint64 message_id) { net_query->debug("Session: trying to send to mtproto::connection"); - CHECK(info->state == ConnectionInfo::State::Ready); + CHECK(info->state_ == ConnectionInfo::State::Ready); current_info_ = info; if (net_query->update_is_ready()) { @@ -994,8 +994,8 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer bool immediately_fail_query = false; if (!immediately_fail_query) { auto r_message_id = - info->connection->send_query(net_query->query().clone(), net_query->gzip_flag() == NetQuery::GzipFlag::On, - message_id, invoke_after_id, static_cast(net_query->quick_ack_promise_)); + info->connection_->send_query(net_query->query().clone(), net_query->gzip_flag() == NetQuery::GzipFlag::On, + message_id, invoke_after_id, static_cast(net_query->quick_ack_promise_)); net_query->on_net_write(net_query->query().size()); @@ -1023,7 +1023,7 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer net_query->cancel_slot_.set_event(EventCreator::raw(actor_id(), message_id)); } auto status = sent_queries_.emplace( - message_id, Query{message_id, std::move(net_query), main_connection_.connection_id, Time::now_cached()}); + message_id, Query{message_id, std::move(net_query), main_connection_.connection_id_, Time::now_cached()}); sent_queries_list_.put(status.first->second.get_list_node()); if (!status.second) { LOG(FATAL) << "Duplicate message_id [message_id = " << message_id << "]"; @@ -1034,16 +1034,16 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer } void Session::connection_open(ConnectionInfo *info, bool ask_info) { - CHECK(info->state == ConnectionInfo::State::Empty); + CHECK(info->state_ == ConnectionInfo::State::Empty); if (!network_flag_) { return; } if (!auth_data_.has_auth_key(Time::now_cached())) { return; } - info->ask_info = ask_info; + info->ask_info_ = ask_info; - info->state = ConnectionInfo::State::Connecting; + info->state_ = ConnectionInfo::State::Connecting; info->cancellation_token_source_ = CancellationTokenSource{}; // NB: rely on constant location of info auto promise = PromiseCreator::cancellable_lambda( @@ -1064,7 +1064,7 @@ void Session::connection_open(ConnectionInfo *info, bool ask_info) { callback_->request_raw_connection(std::move(auth_data), std::move(promise)); } - info->wakeup_at = Time::now_cached() + 1000; + info->wakeup_at_ = Time::now_cached() + 1000; } void Session::connection_add(unique_ptr raw_connection) { @@ -1074,10 +1074,10 @@ void Session::connection_add(unique_ptr raw_connection) } void Session::connection_check_mode(ConnectionInfo *info) { - if (close_flag_ || info->state != ConnectionInfo::State::Ready) { + if (close_flag_ || info->state_ != ConnectionInfo::State::Ready) { return; } - if (info->mode != mode_) { + if (info->mode_ != mode_) { LOG(WARNING) << "Close connection because of outdated mode_"; connection_close(info); } @@ -1085,14 +1085,14 @@ void Session::connection_check_mode(ConnectionInfo *info) { void Session::connection_open_finish(ConnectionInfo *info, Result> r_raw_connection) { - if (close_flag_ || info->state != ConnectionInfo::State::Connecting) { + if (close_flag_ || info->state_ != ConnectionInfo::State::Connecting) { VLOG(dc) << "Ignore raw connection while closing"; return; } current_info_ = info; if (r_raw_connection.is_error()) { LOG(WARNING) << "Failed to open socket: " << r_raw_connection.error(); - info->state = ConnectionInfo::State::Empty; + info->state_ = ConnectionInfo::State::Empty; yield(); return; } @@ -1101,7 +1101,7 @@ void Session::connection_open_finish(ConnectionInfo *info, VLOG(dc) << "Receive raw connection " << raw_connection.get(); if (raw_connection->extra().extra != network_generation_) { LOG(WARNING) << "Got RawConnection with old network_generation"; - info->state = ConnectionInfo::State::Empty; + info->state_ = ConnectionInfo::State::Empty; yield(); return; } @@ -1111,10 +1111,10 @@ void Session::connection_open_finish(ConnectionInfo *info, if (mode_ != expected_mode) { VLOG(dc) << "Change mode " << mode_ << "--->" << expected_mode; mode_ = expected_mode; - if (info->connection_id == 1 && mode_ != Mode::Http) { + if (info->connection_id_ == 1 && mode_ != Mode::Http) { LOG(WARNING) << "Got tcp connection for long poll connection"; connection_add(std::move(raw_connection)); - info->state = ConnectionInfo::State::Empty; + info->state_ = ConnectionInfo::State::Empty; yield(); return; } @@ -1126,7 +1126,7 @@ void Session::connection_open_finish(ConnectionInfo *info, mode = mtproto::SessionConnection::Mode::Tcp; mode_name = Slice("Tcp"); } else { - if (info->connection_id == 0) { + if (info->connection_id_ == 0) { mode = mtproto::SessionConnection::Mode::Http; mode_name = Slice("Http"); } else { @@ -1136,28 +1136,28 @@ void Session::connection_open_finish(ConnectionInfo *info, } auto name = PSTRING() << get_name() << "::Connect::" << mode_name << "::" << raw_connection->extra().debug_str; LOG(INFO) << "Finished to open connection " << name; - info->connection = make_unique(mode, std::move(raw_connection), &auth_data_); + info->connection_ = make_unique(mode, std::move(raw_connection), &auth_data_); if (can_destroy_auth_key()) { - info->connection->destroy_key(); + info->connection_->destroy_key(); } - info->connection->set_online(connection_online_flag_, is_main_); - info->connection->set_name(name); - Scheduler::subscribe(info->connection->get_poll_info().extract_pollable_fd(this)); - info->mode = mode_; - info->state = ConnectionInfo::State::Ready; - info->created_at = Time::now_cached(); - info->wakeup_at = Time::now_cached() + 10; + info->connection_->set_online(connection_online_flag_, is_main_); + info->connection_->set_name(name); + Scheduler::subscribe(info->connection_->get_poll_info().extract_pollable_fd(this)); + info->mode_ = mode_; + info->state_ = ConnectionInfo::State::Ready; + info->created_at_ = Time::now_cached(); + info->wakeup_at_ = Time::now_cached() + 10; if (unknown_queries_.size() > MAX_INFLIGHT_QUERIES) { LOG(ERROR) << "With current limits `Too much queries with unknown state` error must be impossible"; on_session_failed(Status::Error("Too much queries with unknown state")); return; } - if (info->ask_info) { + if (info->ask_info_) { for (auto &id : unknown_queries_) { - info->connection->get_state_info(id); + info->connection_->get_state_info(id); } for (auto &id : to_cancel_) { - info->connection->cancel_answer(id); + info->connection_->cancel_answer(id); } to_cancel_.clear(); } @@ -1165,18 +1165,18 @@ void Session::connection_open_finish(ConnectionInfo *info, } void Session::connection_flush(ConnectionInfo *info) { - CHECK(info->state == ConnectionInfo::State::Ready); + CHECK(info->state_ == ConnectionInfo::State::Ready); current_info_ = info; - info->wakeup_at = info->connection->flush(static_cast(this)); + info->wakeup_at_ = info->connection_->flush(static_cast(this)); } void Session::connection_close(ConnectionInfo *info) { current_info_ = info; - if (info->state != ConnectionInfo::State::Ready) { + if (info->state_ != ConnectionInfo::State::Ready) { return; } - info->connection->force_close(static_cast(this)); - CHECK(info->state == ConnectionInfo::State::Empty); + info->connection_->force_close(static_cast(this)); + CHECK(info->state_ == ConnectionInfo::State::Empty); } bool Session::need_send_check_main_key() const { @@ -1191,7 +1191,7 @@ bool Session::connection_send_check_main_key(ConnectionInfo *info) { if (key_id == being_checked_main_auth_key_id_) { return false; } - CHECK(info->state != ConnectionInfo::State::Empty); + CHECK(info->state_ != ConnectionInfo::State::Empty); LOG(INFO) << "Check main key"; being_checked_main_auth_key_id_ = key_id; last_check_query_id_ = UniqueId::next(UniqueId::BindKey); @@ -1215,7 +1215,7 @@ bool Session::need_send_query() const { } bool Session::connection_send_bind_key(ConnectionInfo *info) { - CHECK(info->state != ConnectionInfo::State::Empty); + CHECK(info->state_ != ConnectionInfo::State::Empty); uint64 key_id = auth_data_.get_tmp_auth_key().id(); if (key_id == being_binded_tmp_auth_key_id_) { return false; @@ -1228,7 +1228,7 @@ bool Session::connection_send_bind_key(ConnectionInfo *info) { auto expires_at = static_cast(auth_data_.get_server_time(auth_data_.get_tmp_auth_key().expires_at())); int64 message_id; BufferSlice encrypted; - std::tie(message_id, encrypted) = info->connection->encrypted_bind(perm_auth_key_id, nonce, expires_at); + std::tie(message_id, encrypted) = info->connection_->encrypted_bind(perm_auth_key_id, nonce, expires_at); LOG(INFO) << "Bind key: " << tag("tmp", key_id) << tag("perm", static_cast(perm_auth_key_id)); NetQueryPtr query = G()->net_query_creator().create( @@ -1367,8 +1367,8 @@ void Session::loop() { connection_online_update(); double wakeup_at = 0; - main_connection_.wakeup_at = 0; - long_poll_connection_.wakeup_at = 0; + main_connection_.wakeup_at_ = 0; + long_poll_connection_.wakeup_at_ = 0; // NB: order is crucial. First long_poll_connection, then main_connection // Otherwise queries could be sent with big delay @@ -1376,20 +1376,20 @@ void Session::loop() { connection_check_mode(&main_connection_); connection_check_mode(&long_poll_connection_); if (mode_ == Mode::Http) { - if (long_poll_connection_.state == ConnectionInfo::State::Ready) { + if (long_poll_connection_.state_ == ConnectionInfo::State::Ready) { connection_flush(&long_poll_connection_); } - if (!close_flag_ && long_poll_connection_.state == ConnectionInfo::State::Empty) { + if (!close_flag_ && long_poll_connection_.state_ == ConnectionInfo::State::Empty) { connection_open(&long_poll_connection_); } - relax_timeout_at(&wakeup_at, long_poll_connection_.wakeup_at); + relax_timeout_at(&wakeup_at, long_poll_connection_.wakeup_at_); } - if (main_connection_.state == ConnectionInfo::State::Ready) { + if (main_connection_.state_ == ConnectionInfo::State::Ready) { // do not send queries before we have key and e.t.c // do not send queries before tmp_key is bound bool need_flush = true; - while (main_connection_.state == ConnectionInfo::State::Ready) { + while (main_connection_.state_ == ConnectionInfo::State::Ready) { if (auth_data_.is_ready(Time::now_cached())) { if (need_send_query()) { while (!pending_queries_.empty() && sent_queries_.size() < MAX_INFLIGHT_QUERIES) { @@ -1416,11 +1416,11 @@ void Session::loop() { } } } - if (!close_flag_ && main_connection_.state == ConnectionInfo::State::Empty) { + if (!close_flag_ && main_connection_.state_ == ConnectionInfo::State::Empty) { connection_open(&main_connection_, true /*send ask_info*/); } - relax_timeout_at(&wakeup_at, main_connection_.wakeup_at); + relax_timeout_at(&wakeup_at, main_connection_.wakeup_at_); double wakeup_in = 0; if (wakeup_at != 0) { diff --git a/td/telegram/net/Session.h b/td/telegram/net/Session.h index 93ff84e08..1ad277449 100644 --- a/td/telegram/net/Session.h +++ b/td/telegram/net/Session.h @@ -140,14 +140,14 @@ class Session final ListNode sent_queries_list_; struct ConnectionInfo { - int8 connection_id = 0; - Mode mode = Mode::Tcp; - enum class State : int8 { Empty, Connecting, Ready } state = State::Empty; + int8 connection_id_ = 0; + Mode mode_ = Mode::Tcp; + enum class State : int8 { Empty, Connecting, Ready } state_ = State::Empty; CancellationTokenSource cancellation_token_source_; - unique_ptr connection; - bool ask_info = false; - double wakeup_at = 0; - double created_at = 0; + unique_ptr connection_; + bool ask_info_ = false; + double wakeup_at_ = 0; + double created_at_ = 0; }; ConnectionInfo *current_info_;