Session: set_online for nonmain connections

GitOrigin-RevId: 9bcfcdbaaf2440a307f221ab07aa650c449c3d26
This commit is contained in:
Arseny Smirnov 2018-11-20 16:07:27 +04:00
parent fffe644aaa
commit c6dd53a76e
5 changed files with 50 additions and 26 deletions

View File

@ -58,8 +58,11 @@ uint64 RawConnection::send_no_crypto(const Storer &storer) {
Status RawConnection::flush_read(const AuthKey &auth_key, Callback &callback) { Status RawConnection::flush_read(const AuthKey &auth_key, Callback &callback) {
auto r = socket_fd_.flush_read(); auto r = socket_fd_.flush_read();
if (r.is_ok() && stats_callback_) { if (r.is_ok()) {
stats_callback_->on_read(r.ok()); if (stats_callback_) {
stats_callback_->on_read(r.ok());
}
callback.on_read(r.ok());
} }
while (transport_->can_read()) { while (transport_->can_read()) {
BufferSlice packet; BufferSlice packet;
@ -73,6 +76,10 @@ Status RawConnection::flush_read(const AuthKey &auth_key, Callback &callback) {
CHECK(is_aligned_pointer<4>(packet.as_slice().ubegin())) CHECK(is_aligned_pointer<4>(packet.as_slice().ubegin()))
<< packet.as_slice().ubegin() << ' ' << packet.size() << ' ' << wait_size; << packet.as_slice().ubegin() << ' ' << packet.size() << ' ' << wait_size;
if (wait_size != 0) { if (wait_size != 0) {
constexpr size_t MAX_PACKET_SIZE = (1 << 22) + 1024;
if (wait_size > MAX_PACKET_SIZE) {
return Status::Error(PSLICE() << "Expected packet size is too big: " << wait_size);
}
break; break;
} }

View File

@ -83,6 +83,8 @@ class RawConnection {
virtual Status before_write() { virtual Status before_write() {
return Status::OK(); return Status::OK();
} }
virtual void on_read(size_t size) {
}
}; };
// NB: After first returned error, all subsequent calls will return error too. // NB: After first returned error, all subsequent calls will return error too.

View File

@ -680,6 +680,11 @@ Status SessionConnection::on_quick_ack(uint64 quick_ack_token) {
callback_->on_message_ack(quick_ack_token); callback_->on_message_ack(quick_ack_token);
return Status::OK(); return Status::OK();
} }
void SessionConnection::on_read(size_t size) {
last_read_at_ = Time::now_cached();
}
SessionConnection::SessionConnection(Mode mode, unique_ptr<RawConnection> raw_connection, AuthData *auth_data, SessionConnection::SessionConnection(Mode mode, unique_ptr<RawConnection> raw_connection, AuthData *auth_data,
DhCallback *dh_callback) DhCallback *dh_callback)
: raw_connection_(std::move(raw_connection)), auth_data_(auth_data), dh_callback_(dh_callback) { : raw_connection_(std::move(raw_connection)), auth_data_(auth_data), dh_callback_(dh_callback) {
@ -695,16 +700,21 @@ PollableFdInfo &SessionConnection::get_poll_info() {
Status SessionConnection::init() { Status SessionConnection::init() {
CHECK(state_ == Init); CHECK(state_ == Init);
last_pong_at_ = Time::now_cached(); last_pong_at_ = Time::now_cached();
last_read_at_ = Time::now_cached();
state_ = Run; state_ = Run;
return Status::OK(); return Status::OK();
} }
void SessionConnection::set_online(bool online_flag) { void SessionConnection::set_online(bool online_flag, bool is_main) {
online_flag_ = online_flag; online_flag_ = online_flag;
is_main_ = is_main;
auto now = Time::now();
if (online_flag_) { if (online_flag_) {
last_pong_at_ = Time::now() - ping_disconnect_delay() + rtt(); last_pong_at_ = now - ping_disconnect_delay() + rtt();
last_read_at_ = now - read_disconnect_delay() + rtt();
} else { } else {
last_pong_at_ = Time::now(); last_pong_at_ = now;
last_read_at_ = now;
} }
last_ping_at_ = 0; last_ping_at_ = 0;
last_ping_message_id_ = 0; last_ping_message_id_ = 0;
@ -829,7 +839,8 @@ void SessionConnection::flush_packet() {
if (mode_ == Mode::HttpLongPoll) { if (mode_ == Mode::HttpLongPoll) {
max_delay = HTTP_MAX_DELAY; max_delay = HTTP_MAX_DELAY;
max_after = HTTP_MAX_AFTER; max_after = HTTP_MAX_AFTER;
auto time_to_disconnect = ping_disconnect_delay() + last_pong_at_ - Time::now_cached(); auto time_to_disconnect =
std::min(ping_disconnect_delay() + last_pong_at_, read_disconnect_delay() + last_read_at_) - Time::now_cached();
max_wait = min(http_max_wait(), static_cast<int>(1000 * max(0.1, time_to_disconnect - rtt()))); max_wait = min(http_max_wait(), static_cast<int>(1000 * max(0.1, time_to_disconnect - rtt())));
} else if (mode_ == Mode::Http) { } else if (mode_ == Mode::Http) {
max_delay = HTTP_MAX_DELAY; max_delay = HTTP_MAX_DELAY;
@ -971,7 +982,13 @@ Status SessionConnection::do_flush() {
// check last pong // check last pong
if (last_pong_at_ != 0 && last_pong_at_ + ping_disconnect_delay() < Time::now_cached()) { if (last_pong_at_ != 0 && last_pong_at_ + ping_disconnect_delay() < Time::now_cached()) {
raw_connection_->stats_callback()->on_error(); raw_connection_->stats_callback()->on_error();
return Status::Error("No pong :("); return Status::Error(PSLICE() << "No pong :( " << tag("rtt", rtt()) << tag("delay", ping_disconnect_delay()));
}
// check last pong
if (last_read_at_ != 0 && last_read_at_ + read_disconnect_delay() < Time::now_cached()) {
raw_connection_->stats_callback()->on_error();
return Status::Error("No read :(");
} }
return Status::OK(); return Status::OK();
@ -992,6 +1009,7 @@ double SessionConnection::flush(SessionConnection::Callback *callback) {
// 1. close connection after PING_DISCONNECT_DELAY after last_pong. // 1. close connection after PING_DISCONNECT_DELAY after last_pong.
// 2. the one returned by must_flush_packet // 2. the one returned by must_flush_packet
relax_timeout_at(&wakeup_at_, last_pong_at_ + ping_disconnect_delay() + 0.002); relax_timeout_at(&wakeup_at_, last_pong_at_ + ping_disconnect_delay() + 0.002);
relax_timeout_at(&wakeup_at_, last_read_at_ + read_disconnect_delay() + 0.002);
// CHECK(wakeup_at > Time::now_cached()); // CHECK(wakeup_at > Time::now_cached());
relax_timeout_at(&wakeup_at_, flush_packet_at_); relax_timeout_at(&wakeup_at_, flush_packet_at_);

View File

@ -79,7 +79,7 @@ class SessionConnection
void cancel_answer(int64 message_id); void cancel_answer(int64 message_id);
void destroy_key(); void destroy_key();
void set_online(bool online_flag); void set_online(bool online_flag, bool is_main);
// Callback // Callback
class Callback { class Callback {
@ -124,13 +124,18 @@ class SessionConnection
static constexpr double RESEND_ANSWER_DELAY = 0.001; // 0.001s static constexpr double RESEND_ANSWER_DELAY = 0.001; // 0.001s
bool online_flag_ = false; bool online_flag_ = false;
bool is_main_ = false;
int rtt() const { int rtt() const {
return max(2, static_cast<int>(raw_connection_->rtt_ * 1.5 + 1)); return max(2, static_cast<int>(raw_connection_->rtt_ * 1.5 + 1));
} }
int32 read_disconnect_delay() {
return online_flag_ ? rtt() * 7 / 2 : 135;
}
int32 ping_disconnect_delay() const { int32 ping_disconnect_delay() const {
return online_flag_ ? rtt() * 5 / 2 : 135; return (online_flag_ && is_main_) ? rtt() * 5 / 2 : 135;
} }
int32 ping_may_delay() const { int32 ping_may_delay() const {
@ -164,6 +169,7 @@ class SessionConnection
// nobody cleans up this map. But it should be really small. // nobody cleans up this map. But it should be really small.
std::unordered_map<uint64, std::vector<uint64>> container_to_service_msg_; std::unordered_map<uint64, std::vector<uint64>> container_to_service_msg_;
double last_read_at_ = 0;
double last_ping_at_ = 0; double last_ping_at_ = 0;
double last_pong_at_ = 0; double last_pong_at_ = 0;
int64 cur_ping_id_ = 0; int64 cur_ping_id_ = 0;
@ -257,6 +263,7 @@ class SessionConnection
Status before_write() override TD_WARN_UNUSED_RESULT; Status before_write() override TD_WARN_UNUSED_RESULT;
Status on_raw_packet(const td::mtproto::PacketInfo &info, BufferSlice packet) override; Status on_raw_packet(const td::mtproto::PacketInfo &info, BufferSlice packet) override;
Status on_quick_ack(uint64 quick_ack_token) override; Status on_quick_ack(uint64 quick_ack_token) override;
void on_read(size_t size) override;
}; };
} // namespace mtproto } // namespace mtproto

View File

@ -206,20 +206,11 @@ void Session::connection_online_update(bool force) {
} }
connection_online_flag_ = new_connection_online_flag; connection_online_flag_ = new_connection_online_flag;
VLOG(dc) << "Set connection_online " << connection_online_flag_; VLOG(dc) << "Set connection_online " << connection_online_flag_;
if (is_main_) { if (main_connection_.connection) {
if (main_connection_.connection) { main_connection_.connection->set_online(connection_online_flag_, is_main_);
main_connection_.connection->set_online(connection_online_flag_); }
} if (long_poll_connection_.connection) {
if (long_poll_connection_.connection) { long_poll_connection_.connection->set_online(connection_online_flag_, is_main_);
long_poll_connection_.connection->set_online(connection_online_flag_);
}
} else {
// TODO: support online state in media connections.
if (connection_online_flag_) {
connection_close(&main_connection_);
connection_close(&long_poll_connection_);
}
return;
} }
} }
@ -967,11 +958,10 @@ void Session::connection_open_finish(ConnectionInfo *info,
} }
auto name = PSTRING() << get_name() << "::Connect::" << mode_name << "::" << raw_connection->debug_str_; auto name = PSTRING() << get_name() << "::Connect::" << mode_name << "::" << raw_connection->debug_str_;
LOG(INFO) << "connection_open_finish: " << name; LOG(INFO) << "connection_open_finish: " << name;
//LOG(ERROR) << "connection_open_finish: " << name;
info->connection = info->connection =
make_unique<mtproto::SessionConnection>(mode, std::move(raw_connection), &auth_data_, DhCache::instance()); make_unique<mtproto::SessionConnection>(mode, std::move(raw_connection), &auth_data_, DhCache::instance());
if (is_main_) { info->connection->set_online(connection_online_flag_, is_main_);
info->connection->set_online(connection_online_flag_);
}
info->connection->set_name(name); info->connection->set_name(name);
Scheduler::subscribe(info->connection->get_poll_info().extract_pollable_fd(this)); Scheduler::subscribe(info->connection->get_poll_info().extract_pollable_fd(this));
info->mode = mode_; info->mode = mode_;