Unify Session::ConnectionInfo field name style.

This commit is contained in:
levlam 2021-11-22 14:36:59 +03:00
parent 21a7d147f5
commit 0c554fbe04
2 changed files with 71 additions and 71 deletions

View File

@ -172,8 +172,8 @@ Session::Session(unique_ptr<Callback> callback, std::shared_ptr<AuthDataShared>
callback_ = std::shared_ptr<Callback>(callback.release());
main_connection_.connection_id = 0;
long_poll_connection_.connection_id = 1;
main_connection_.connection_id_ = 0;
long_poll_connection_.connection_id_ = 1;
if (is_cdn) {
auth_data_.set_header(G()->mtproto_header().get_anonymous_header().str());
@ -243,11 +243,11 @@ void Session::connection_online_update(bool force) {
}
connection_online_flag_ = new_connection_online_flag;
VLOG(dc) << "Set connection_online " << connection_online_flag_;
if (main_connection_.connection) {
main_connection_.connection->set_online(connection_online_flag_, is_main_);
if (main_connection_.connection_) {
main_connection_.connection_->set_online(connection_online_flag_, is_main_);
}
if (long_poll_connection_.connection) {
long_poll_connection_.connection->set_online(connection_online_flag_, is_main_);
if (long_poll_connection_.connection_) {
long_poll_connection_.connection_->set_online(connection_online_flag_, is_main_);
}
}
@ -422,8 +422,8 @@ void Session::raw_event(const Event::Raw &event) {
return_query(std::move(query));
LOG(DEBUG) << "Drop answer " << tag("message_id", format::as_hex(message_id));
if (main_connection_.state == ConnectionInfo::State::Ready) {
main_connection_.connection->cancel_answer(message_id);
if (main_connection_.state_ == ConnectionInfo::State::Ready) {
main_connection_.connection_->cancel_answer(message_id);
} else {
to_cancel_.push_back(message_id);
}
@ -442,11 +442,11 @@ Status Session::on_pong() {
constexpr int MAX_QUERY_TIMEOUT = 60;
constexpr int MIN_CONNECTION_ACTIVE = 60;
if (current_info_ == &main_connection_ &&
Timestamp::at(current_info_->created_at + MIN_CONNECTION_ACTIVE).is_in_past()) {
Timestamp::at(current_info_->created_at_ + MIN_CONNECTION_ACTIVE).is_in_past()) {
Status status;
if (!unknown_queries_.empty()) {
status = Status::Error(PSLICE() << "No state info for " << unknown_queries_.size() << " queries for "
<< format::as_time(Time::now_cached() - current_info_->created_at));
<< format::as_time(Time::now_cached() - current_info_->created_at_));
}
if (!sent_queries_list_.empty()) {
for (auto it = sent_queries_list_.prev; it != &sent_queries_list_; it = it->prev) {
@ -493,16 +493,16 @@ void Session::on_closed(Status status) {
if (!close_flag_ && is_main_) {
connection_token_.reset();
}
auto raw_connection = current_info_->connection->move_as_raw_connection();
auto raw_connection = current_info_->connection_->move_as_raw_connection();
Scheduler::unsubscribe_before_close(raw_connection->get_poll_info().get_pollable_fd_ref());
raw_connection->close();
if (status.is_error()) {
LOG(WARNING) << "Session with " << sent_queries_.size() << " pending requests was closed: " << status << " "
<< current_info_->connection->get_name();
<< current_info_->connection_->get_name();
} else {
LOG(INFO) << "Session with " << sent_queries_.size() << " pending requests was closed: " << status << " "
<< current_info_->connection->get_name();
<< current_info_->connection_->get_name();
}
if (status.is_error() && status.code() == -404) {
@ -535,7 +535,7 @@ void Session::on_closed(Status status) {
// resend all queries without ack
for (auto it = sent_queries_.begin(); it != sent_queries_.end();) {
if (!it->second.ack && it->second.connection_id == current_info_->connection_id) {
if (!it->second.ack && it->second.connection_id == current_info_->connection_id_) {
// container vector leak otherwise
cleanup_container(it->first, &it->second);
@ -549,7 +549,7 @@ void Session::on_closed(Status status) {
query->set_message_id(0);
query->cancel_slot_.clear_event();
query->set_error(Status::Error(500, PSLICE() << "Session failed: " << status.message()),
current_info_->connection->get_name().str());
current_info_->connection_->get_name().str());
return_query(std::move(query));
it = sent_queries_.erase(it);
} else {
@ -561,8 +561,8 @@ void Session::on_closed(Status status) {
}
}
current_info_->connection.reset();
current_info_->state = ConnectionInfo::State::Empty;
current_info_->connection_.reset();
current_info_->state_ = ConnectionInfo::State::Empty;
}
void Session::on_session_created(uint64 unique_id, uint64 first_id) {
@ -846,7 +846,7 @@ void Session::on_message_result_error(uint64 id, int error_code, string message)
cleanup_container(id, query_ptr);
mark_as_known(id, query_ptr);
query_ptr->query->set_error(Status::Error(error_code, message), current_info_->connection->get_name().str());
query_ptr->query->set_error(Status::Error(error_code, message), current_info_->connection_->get_name().str());
query_ptr->query->set_message_id(0);
query_ptr->query->cancel_slot_.clear_event();
return_query(std::move(query_ptr->query));
@ -938,7 +938,7 @@ void Session::on_message_info(uint64 id, int32 state, uint64 answer_id, int32 an
<< tag("answer_size", answer_size) << it->second.query;
it->second.query->debug("Session: resend answer");
}
current_info_->connection->resend_answer(answer_id);
current_info_->connection_->resend_answer(answer_id);
}
}
@ -970,7 +970,7 @@ void Session::add_query(NetQueryPtr &&net_query) {
void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_query, uint64 message_id) {
net_query->debug("Session: trying to send to mtproto::connection");
CHECK(info->state == ConnectionInfo::State::Ready);
CHECK(info->state_ == ConnectionInfo::State::Ready);
current_info_ = info;
if (net_query->update_is_ready()) {
@ -994,8 +994,8 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer
bool immediately_fail_query = false;
if (!immediately_fail_query) {
auto r_message_id =
info->connection->send_query(net_query->query().clone(), net_query->gzip_flag() == NetQuery::GzipFlag::On,
message_id, invoke_after_id, static_cast<bool>(net_query->quick_ack_promise_));
info->connection_->send_query(net_query->query().clone(), net_query->gzip_flag() == NetQuery::GzipFlag::On,
message_id, invoke_after_id, static_cast<bool>(net_query->quick_ack_promise_));
net_query->on_net_write(net_query->query().size());
@ -1023,7 +1023,7 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer
net_query->cancel_slot_.set_event(EventCreator::raw(actor_id(), message_id));
}
auto status = sent_queries_.emplace(
message_id, Query{message_id, std::move(net_query), main_connection_.connection_id, Time::now_cached()});
message_id, Query{message_id, std::move(net_query), main_connection_.connection_id_, Time::now_cached()});
sent_queries_list_.put(status.first->second.get_list_node());
if (!status.second) {
LOG(FATAL) << "Duplicate message_id [message_id = " << message_id << "]";
@ -1034,16 +1034,16 @@ void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_quer
}
void Session::connection_open(ConnectionInfo *info, bool ask_info) {
CHECK(info->state == ConnectionInfo::State::Empty);
CHECK(info->state_ == ConnectionInfo::State::Empty);
if (!network_flag_) {
return;
}
if (!auth_data_.has_auth_key(Time::now_cached())) {
return;
}
info->ask_info = ask_info;
info->ask_info_ = ask_info;
info->state = ConnectionInfo::State::Connecting;
info->state_ = ConnectionInfo::State::Connecting;
info->cancellation_token_source_ = CancellationTokenSource{};
// NB: rely on constant location of info
auto promise = PromiseCreator::cancellable_lambda(
@ -1064,7 +1064,7 @@ void Session::connection_open(ConnectionInfo *info, bool ask_info) {
callback_->request_raw_connection(std::move(auth_data), std::move(promise));
}
info->wakeup_at = Time::now_cached() + 1000;
info->wakeup_at_ = Time::now_cached() + 1000;
}
void Session::connection_add(unique_ptr<mtproto::RawConnection> raw_connection) {
@ -1074,10 +1074,10 @@ void Session::connection_add(unique_ptr<mtproto::RawConnection> raw_connection)
}
void Session::connection_check_mode(ConnectionInfo *info) {
if (close_flag_ || info->state != ConnectionInfo::State::Ready) {
if (close_flag_ || info->state_ != ConnectionInfo::State::Ready) {
return;
}
if (info->mode != mode_) {
if (info->mode_ != mode_) {
LOG(WARNING) << "Close connection because of outdated mode_";
connection_close(info);
}
@ -1085,14 +1085,14 @@ void Session::connection_check_mode(ConnectionInfo *info) {
void Session::connection_open_finish(ConnectionInfo *info,
Result<unique_ptr<mtproto::RawConnection>> r_raw_connection) {
if (close_flag_ || info->state != ConnectionInfo::State::Connecting) {
if (close_flag_ || info->state_ != ConnectionInfo::State::Connecting) {
VLOG(dc) << "Ignore raw connection while closing";
return;
}
current_info_ = info;
if (r_raw_connection.is_error()) {
LOG(WARNING) << "Failed to open socket: " << r_raw_connection.error();
info->state = ConnectionInfo::State::Empty;
info->state_ = ConnectionInfo::State::Empty;
yield();
return;
}
@ -1101,7 +1101,7 @@ void Session::connection_open_finish(ConnectionInfo *info,
VLOG(dc) << "Receive raw connection " << raw_connection.get();
if (raw_connection->extra().extra != network_generation_) {
LOG(WARNING) << "Got RawConnection with old network_generation";
info->state = ConnectionInfo::State::Empty;
info->state_ = ConnectionInfo::State::Empty;
yield();
return;
}
@ -1111,10 +1111,10 @@ void Session::connection_open_finish(ConnectionInfo *info,
if (mode_ != expected_mode) {
VLOG(dc) << "Change mode " << mode_ << "--->" << expected_mode;
mode_ = expected_mode;
if (info->connection_id == 1 && mode_ != Mode::Http) {
if (info->connection_id_ == 1 && mode_ != Mode::Http) {
LOG(WARNING) << "Got tcp connection for long poll connection";
connection_add(std::move(raw_connection));
info->state = ConnectionInfo::State::Empty;
info->state_ = ConnectionInfo::State::Empty;
yield();
return;
}
@ -1126,7 +1126,7 @@ void Session::connection_open_finish(ConnectionInfo *info,
mode = mtproto::SessionConnection::Mode::Tcp;
mode_name = Slice("Tcp");
} else {
if (info->connection_id == 0) {
if (info->connection_id_ == 0) {
mode = mtproto::SessionConnection::Mode::Http;
mode_name = Slice("Http");
} else {
@ -1136,28 +1136,28 @@ void Session::connection_open_finish(ConnectionInfo *info,
}
auto name = PSTRING() << get_name() << "::Connect::" << mode_name << "::" << raw_connection->extra().debug_str;
LOG(INFO) << "Finished to open connection " << name;
info->connection = make_unique<mtproto::SessionConnection>(mode, std::move(raw_connection), &auth_data_);
info->connection_ = make_unique<mtproto::SessionConnection>(mode, std::move(raw_connection), &auth_data_);
if (can_destroy_auth_key()) {
info->connection->destroy_key();
info->connection_->destroy_key();
}
info->connection->set_online(connection_online_flag_, is_main_);
info->connection->set_name(name);
Scheduler::subscribe(info->connection->get_poll_info().extract_pollable_fd(this));
info->mode = mode_;
info->state = ConnectionInfo::State::Ready;
info->created_at = Time::now_cached();
info->wakeup_at = Time::now_cached() + 10;
info->connection_->set_online(connection_online_flag_, is_main_);
info->connection_->set_name(name);
Scheduler::subscribe(info->connection_->get_poll_info().extract_pollable_fd(this));
info->mode_ = mode_;
info->state_ = ConnectionInfo::State::Ready;
info->created_at_ = Time::now_cached();
info->wakeup_at_ = Time::now_cached() + 10;
if (unknown_queries_.size() > MAX_INFLIGHT_QUERIES) {
LOG(ERROR) << "With current limits `Too much queries with unknown state` error must be impossible";
on_session_failed(Status::Error("Too much queries with unknown state"));
return;
}
if (info->ask_info) {
if (info->ask_info_) {
for (auto &id : unknown_queries_) {
info->connection->get_state_info(id);
info->connection_->get_state_info(id);
}
for (auto &id : to_cancel_) {
info->connection->cancel_answer(id);
info->connection_->cancel_answer(id);
}
to_cancel_.clear();
}
@ -1165,18 +1165,18 @@ void Session::connection_open_finish(ConnectionInfo *info,
}
void Session::connection_flush(ConnectionInfo *info) {
CHECK(info->state == ConnectionInfo::State::Ready);
CHECK(info->state_ == ConnectionInfo::State::Ready);
current_info_ = info;
info->wakeup_at = info->connection->flush(static_cast<mtproto::SessionConnection::Callback *>(this));
info->wakeup_at_ = info->connection_->flush(static_cast<mtproto::SessionConnection::Callback *>(this));
}
void Session::connection_close(ConnectionInfo *info) {
current_info_ = info;
if (info->state != ConnectionInfo::State::Ready) {
if (info->state_ != ConnectionInfo::State::Ready) {
return;
}
info->connection->force_close(static_cast<mtproto::SessionConnection::Callback *>(this));
CHECK(info->state == ConnectionInfo::State::Empty);
info->connection_->force_close(static_cast<mtproto::SessionConnection::Callback *>(this));
CHECK(info->state_ == ConnectionInfo::State::Empty);
}
bool Session::need_send_check_main_key() const {
@ -1191,7 +1191,7 @@ bool Session::connection_send_check_main_key(ConnectionInfo *info) {
if (key_id == being_checked_main_auth_key_id_) {
return false;
}
CHECK(info->state != ConnectionInfo::State::Empty);
CHECK(info->state_ != ConnectionInfo::State::Empty);
LOG(INFO) << "Check main key";
being_checked_main_auth_key_id_ = key_id;
last_check_query_id_ = UniqueId::next(UniqueId::BindKey);
@ -1215,7 +1215,7 @@ bool Session::need_send_query() const {
}
bool Session::connection_send_bind_key(ConnectionInfo *info) {
CHECK(info->state != ConnectionInfo::State::Empty);
CHECK(info->state_ != ConnectionInfo::State::Empty);
uint64 key_id = auth_data_.get_tmp_auth_key().id();
if (key_id == being_binded_tmp_auth_key_id_) {
return false;
@ -1228,7 +1228,7 @@ bool Session::connection_send_bind_key(ConnectionInfo *info) {
auto expires_at = static_cast<int32>(auth_data_.get_server_time(auth_data_.get_tmp_auth_key().expires_at()));
int64 message_id;
BufferSlice encrypted;
std::tie(message_id, encrypted) = info->connection->encrypted_bind(perm_auth_key_id, nonce, expires_at);
std::tie(message_id, encrypted) = info->connection_->encrypted_bind(perm_auth_key_id, nonce, expires_at);
LOG(INFO) << "Bind key: " << tag("tmp", key_id) << tag("perm", static_cast<uint64>(perm_auth_key_id));
NetQueryPtr query = G()->net_query_creator().create(
@ -1367,8 +1367,8 @@ void Session::loop() {
connection_online_update();
double wakeup_at = 0;
main_connection_.wakeup_at = 0;
long_poll_connection_.wakeup_at = 0;
main_connection_.wakeup_at_ = 0;
long_poll_connection_.wakeup_at_ = 0;
// NB: order is crucial. First long_poll_connection, then main_connection
// Otherwise queries could be sent with big delay
@ -1376,20 +1376,20 @@ void Session::loop() {
connection_check_mode(&main_connection_);
connection_check_mode(&long_poll_connection_);
if (mode_ == Mode::Http) {
if (long_poll_connection_.state == ConnectionInfo::State::Ready) {
if (long_poll_connection_.state_ == ConnectionInfo::State::Ready) {
connection_flush(&long_poll_connection_);
}
if (!close_flag_ && long_poll_connection_.state == ConnectionInfo::State::Empty) {
if (!close_flag_ && long_poll_connection_.state_ == ConnectionInfo::State::Empty) {
connection_open(&long_poll_connection_);
}
relax_timeout_at(&wakeup_at, long_poll_connection_.wakeup_at);
relax_timeout_at(&wakeup_at, long_poll_connection_.wakeup_at_);
}
if (main_connection_.state == ConnectionInfo::State::Ready) {
if (main_connection_.state_ == ConnectionInfo::State::Ready) {
// do not send queries before we have key and e.t.c
// do not send queries before tmp_key is bound
bool need_flush = true;
while (main_connection_.state == ConnectionInfo::State::Ready) {
while (main_connection_.state_ == ConnectionInfo::State::Ready) {
if (auth_data_.is_ready(Time::now_cached())) {
if (need_send_query()) {
while (!pending_queries_.empty() && sent_queries_.size() < MAX_INFLIGHT_QUERIES) {
@ -1416,11 +1416,11 @@ void Session::loop() {
}
}
}
if (!close_flag_ && main_connection_.state == ConnectionInfo::State::Empty) {
if (!close_flag_ && main_connection_.state_ == ConnectionInfo::State::Empty) {
connection_open(&main_connection_, true /*send ask_info*/);
}
relax_timeout_at(&wakeup_at, main_connection_.wakeup_at);
relax_timeout_at(&wakeup_at, main_connection_.wakeup_at_);
double wakeup_in = 0;
if (wakeup_at != 0) {

View File

@ -140,14 +140,14 @@ class Session final
ListNode sent_queries_list_;
struct ConnectionInfo {
int8 connection_id = 0;
Mode mode = Mode::Tcp;
enum class State : int8 { Empty, Connecting, Ready } state = State::Empty;
int8 connection_id_ = 0;
Mode mode_ = Mode::Tcp;
enum class State : int8 { Empty, Connecting, Ready } state_ = State::Empty;
CancellationTokenSource cancellation_token_source_;
unique_ptr<mtproto::SessionConnection> connection;
bool ask_info = false;
double wakeup_at = 0;
double created_at = 0;
unique_ptr<mtproto::SessionConnection> connection_;
bool ask_info_ = false;
double wakeup_at_ = 0;
double created_at_ = 0;
};
ConnectionInfo *current_info_;