From c6dd53a76edf248091aa92cd31182cd4e4aa2f2b Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Tue, 20 Nov 2018 16:07:27 +0400 Subject: [PATCH] Session: set_online for nonmain connections GitOrigin-RevId: 9bcfcdbaaf2440a307f221ab07aa650c449c3d26 --- td/mtproto/RawConnection.cpp | 11 +++++++++-- td/mtproto/RawConnection.h | 2 ++ td/mtproto/SessionConnection.cpp | 28 +++++++++++++++++++++++----- td/mtproto/SessionConnection.h | 11 +++++++++-- td/telegram/net/Session.cpp | 24 +++++++----------------- 5 files changed, 50 insertions(+), 26 deletions(-) diff --git a/td/mtproto/RawConnection.cpp b/td/mtproto/RawConnection.cpp index cbd3dfc3..3c15547b 100644 --- a/td/mtproto/RawConnection.cpp +++ b/td/mtproto/RawConnection.cpp @@ -58,8 +58,11 @@ uint64 RawConnection::send_no_crypto(const Storer &storer) { Status RawConnection::flush_read(const AuthKey &auth_key, Callback &callback) { auto r = socket_fd_.flush_read(); - if (r.is_ok() && stats_callback_) { - stats_callback_->on_read(r.ok()); + if (r.is_ok()) { + if (stats_callback_) { + stats_callback_->on_read(r.ok()); + } + callback.on_read(r.ok()); } while (transport_->can_read()) { BufferSlice packet; @@ -73,6 +76,10 @@ Status RawConnection::flush_read(const AuthKey &auth_key, Callback &callback) { CHECK(is_aligned_pointer<4>(packet.as_slice().ubegin())) << packet.as_slice().ubegin() << ' ' << packet.size() << ' ' << wait_size; if (wait_size != 0) { + constexpr size_t MAX_PACKET_SIZE = (1 << 22) + 1024; + if (wait_size > MAX_PACKET_SIZE) { + return Status::Error(PSLICE() << "Expected packet size is too big: " << wait_size); + } break; } diff --git a/td/mtproto/RawConnection.h b/td/mtproto/RawConnection.h index 1e4572c2..3d04b4ba 100644 --- a/td/mtproto/RawConnection.h +++ b/td/mtproto/RawConnection.h @@ -83,6 +83,8 @@ class RawConnection { virtual Status before_write() { return Status::OK(); } + virtual void on_read(size_t size) { + } }; // NB: After first returned error, all subsequent calls will return error too. diff --git a/td/mtproto/SessionConnection.cpp b/td/mtproto/SessionConnection.cpp index c960b158..dedd406f 100644 --- a/td/mtproto/SessionConnection.cpp +++ b/td/mtproto/SessionConnection.cpp @@ -680,6 +680,11 @@ Status SessionConnection::on_quick_ack(uint64 quick_ack_token) { callback_->on_message_ack(quick_ack_token); return Status::OK(); } + +void SessionConnection::on_read(size_t size) { + last_read_at_ = Time::now_cached(); +} + SessionConnection::SessionConnection(Mode mode, unique_ptr raw_connection, AuthData *auth_data, DhCallback *dh_callback) : raw_connection_(std::move(raw_connection)), auth_data_(auth_data), dh_callback_(dh_callback) { @@ -695,16 +700,21 @@ PollableFdInfo &SessionConnection::get_poll_info() { Status SessionConnection::init() { CHECK(state_ == Init); last_pong_at_ = Time::now_cached(); + last_read_at_ = Time::now_cached(); state_ = Run; return Status::OK(); } -void SessionConnection::set_online(bool online_flag) { +void SessionConnection::set_online(bool online_flag, bool is_main) { online_flag_ = online_flag; + is_main_ = is_main; + auto now = Time::now(); if (online_flag_) { - last_pong_at_ = Time::now() - ping_disconnect_delay() + rtt(); + last_pong_at_ = now - ping_disconnect_delay() + rtt(); + last_read_at_ = now - read_disconnect_delay() + rtt(); } else { - last_pong_at_ = Time::now(); + last_pong_at_ = now; + last_read_at_ = now; } last_ping_at_ = 0; last_ping_message_id_ = 0; @@ -829,7 +839,8 @@ void SessionConnection::flush_packet() { if (mode_ == Mode::HttpLongPoll) { max_delay = HTTP_MAX_DELAY; max_after = HTTP_MAX_AFTER; - auto time_to_disconnect = ping_disconnect_delay() + last_pong_at_ - Time::now_cached(); + auto time_to_disconnect = + std::min(ping_disconnect_delay() + last_pong_at_, read_disconnect_delay() + last_read_at_) - Time::now_cached(); max_wait = min(http_max_wait(), static_cast(1000 * max(0.1, time_to_disconnect - rtt()))); } else if (mode_ == Mode::Http) { max_delay = HTTP_MAX_DELAY; @@ -971,7 +982,13 @@ Status SessionConnection::do_flush() { // check last pong if (last_pong_at_ != 0 && last_pong_at_ + ping_disconnect_delay() < Time::now_cached()) { raw_connection_->stats_callback()->on_error(); - return Status::Error("No pong :("); + return Status::Error(PSLICE() << "No pong :( " << tag("rtt", rtt()) << tag("delay", ping_disconnect_delay())); + } + + // check last pong + if (last_read_at_ != 0 && last_read_at_ + read_disconnect_delay() < Time::now_cached()) { + raw_connection_->stats_callback()->on_error(); + return Status::Error("No read :("); } return Status::OK(); @@ -992,6 +1009,7 @@ double SessionConnection::flush(SessionConnection::Callback *callback) { // 1. close connection after PING_DISCONNECT_DELAY after last_pong. // 2. the one returned by must_flush_packet relax_timeout_at(&wakeup_at_, last_pong_at_ + ping_disconnect_delay() + 0.002); + relax_timeout_at(&wakeup_at_, last_read_at_ + read_disconnect_delay() + 0.002); // CHECK(wakeup_at > Time::now_cached()); relax_timeout_at(&wakeup_at_, flush_packet_at_); diff --git a/td/mtproto/SessionConnection.h b/td/mtproto/SessionConnection.h index 4f5e196a..777e0be4 100644 --- a/td/mtproto/SessionConnection.h +++ b/td/mtproto/SessionConnection.h @@ -79,7 +79,7 @@ class SessionConnection void cancel_answer(int64 message_id); void destroy_key(); - void set_online(bool online_flag); + void set_online(bool online_flag, bool is_main); // Callback class Callback { @@ -124,13 +124,18 @@ class SessionConnection static constexpr double RESEND_ANSWER_DELAY = 0.001; // 0.001s bool online_flag_ = false; + bool is_main_ = false; int rtt() const { return max(2, static_cast(raw_connection_->rtt_ * 1.5 + 1)); } + int32 read_disconnect_delay() { + return online_flag_ ? rtt() * 7 / 2 : 135; + } + int32 ping_disconnect_delay() const { - return online_flag_ ? rtt() * 5 / 2 : 135; + return (online_flag_ && is_main_) ? rtt() * 5 / 2 : 135; } int32 ping_may_delay() const { @@ -164,6 +169,7 @@ class SessionConnection // nobody cleans up this map. But it should be really small. std::unordered_map> container_to_service_msg_; + double last_read_at_ = 0; double last_ping_at_ = 0; double last_pong_at_ = 0; int64 cur_ping_id_ = 0; @@ -257,6 +263,7 @@ class SessionConnection Status before_write() override TD_WARN_UNUSED_RESULT; Status on_raw_packet(const td::mtproto::PacketInfo &info, BufferSlice packet) override; Status on_quick_ack(uint64 quick_ack_token) override; + void on_read(size_t size) override; }; } // namespace mtproto diff --git a/td/telegram/net/Session.cpp b/td/telegram/net/Session.cpp index 65cd1f37..1bcfbc0e 100644 --- a/td/telegram/net/Session.cpp +++ b/td/telegram/net/Session.cpp @@ -206,20 +206,11 @@ void Session::connection_online_update(bool force) { } connection_online_flag_ = new_connection_online_flag; VLOG(dc) << "Set connection_online " << connection_online_flag_; - if (is_main_) { - if (main_connection_.connection) { - main_connection_.connection->set_online(connection_online_flag_); - } - if (long_poll_connection_.connection) { - long_poll_connection_.connection->set_online(connection_online_flag_); - } - } else { - // TODO: support online state in media connections. - if (connection_online_flag_) { - connection_close(&main_connection_); - connection_close(&long_poll_connection_); - } - return; + if (main_connection_.connection) { + main_connection_.connection->set_online(connection_online_flag_, is_main_); + } + if (long_poll_connection_.connection) { + long_poll_connection_.connection->set_online(connection_online_flag_, is_main_); } } @@ -967,11 +958,10 @@ void Session::connection_open_finish(ConnectionInfo *info, } auto name = PSTRING() << get_name() << "::Connect::" << mode_name << "::" << raw_connection->debug_str_; LOG(INFO) << "connection_open_finish: " << name; + //LOG(ERROR) << "connection_open_finish: " << name; info->connection = make_unique(mode, std::move(raw_connection), &auth_data_, DhCache::instance()); - if (is_main_) { - info->connection->set_online(connection_online_flag_); - } + info->connection->set_online(connection_online_flag_, is_main_); info->connection->set_name(name); Scheduler::subscribe(info->connection->get_poll_info().extract_pollable_fd(this)); info->mode = mode_;