diff --git a/CMakeLists.txt b/CMakeLists.txt index 3ec8c4566..d51bca79c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -343,6 +343,8 @@ set(TDLIB_SOURCE td/mtproto/HandshakeActor.cpp td/mtproto/HttpTransport.cpp td/mtproto/IStreamTransport.cpp + td/mtproto/Ping.cpp + td/mtproto/PingConnection.cpp td/mtproto/RawConnection.cpp td/mtproto/SessionConnection.cpp td/mtproto/TcpTransport.cpp @@ -467,6 +469,7 @@ set(TDLIB_SOURCE td/mtproto/NoCryptoStorer.h td/mtproto/PacketInfo.h td/mtproto/PacketStorer.h + td/mtproto/Ping.h td/mtproto/PingConnection.h td/mtproto/Query.h td/mtproto/RawConnection.h diff --git a/td/mtproto/Ping.cpp b/td/mtproto/Ping.cpp new file mode 100644 index 000000000..1fbaf21a3 --- /dev/null +++ b/td/mtproto/Ping.cpp @@ -0,0 +1,101 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/mtproto/Ping.h" + +#include "td/mtproto/AuthData.h" +#include "td/mtproto/PingConnection.h" + +#include "td/utils/crypto.h" +#include "td/utils/Random.h" +namespace td { +namespace mtproto { +ActorOwn<> create_ping_actor(std::string debug, unique_ptr raw_connection, + unique_ptr auth_data, + Promise> promise, ActorShared<> parent) { + class PingActor : public Actor { + public: + PingActor(unique_ptr raw_connection, unique_ptr auth_data, + Promise> promise, ActorShared<> parent) + : promise_(std::move(promise)), parent_(std::move(parent)) { + if (auth_data) { + ping_connection_ = mtproto::PingConnection::create_ping_pong(std::move(raw_connection), std::move(auth_data)); + } else { + ping_connection_ = mtproto::PingConnection::create_req_pq(std::move(raw_connection), 2); + } + } + + private: + unique_ptr ping_connection_; + Promise> promise_; + ActorShared<> parent_; + + void start_up() override { + Scheduler::subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this)); + set_timeout_in(10); + yield(); + } + + void hangup() override { + finish(Status::Error("Cancelled")); + stop(); + } + + void tear_down() override { + finish(Status::OK()); + } + + void loop() override { + auto status = ping_connection_->flush(); + if (status.is_error()) { + finish(std::move(status)); + return stop(); + } + if (ping_connection_->was_pong()) { + finish(Status::OK()); + return stop(); + } + } + + void timeout_expired() override { + finish(Status::Error("Pong timeout expired")); + stop(); + } + + void finish(Status status) { + auto raw_connection = ping_connection_->move_as_raw_connection(); + if (!raw_connection) { + CHECK(!promise_); + return; + } + Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref()); + if (promise_) { + if (status.is_error()) { + if (raw_connection->stats_callback()) { + raw_connection->stats_callback()->on_error(); + } + raw_connection->close(); + promise_.set_error(std::move(status)); + } else { + raw_connection->rtt_ = ping_connection_->rtt(); + if (raw_connection->stats_callback()) { + raw_connection->stats_callback()->on_pong(); + } + promise_.set_value(std::move(raw_connection)); + } + } else { + if (raw_connection->stats_callback()) { + raw_connection->stats_callback()->on_error(); + } + raw_connection->close(); + } + } + }; + return ActorOwn<>(create_actor(PSLICE() << "PingActor<" << debug << ">", std::move(raw_connection), + std::move(auth_data), std::move(promise), std::move(parent))); +} +} // namespace mtproto +} // namespace td diff --git a/td/mtproto/Ping.h b/td/mtproto/Ping.h new file mode 100644 index 000000000..7343edd3d --- /dev/null +++ b/td/mtproto/Ping.h @@ -0,0 +1,20 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once +#include "td/mtproto/RawConnection.h" +#include "td/mtproto/SessionConnection.h" +#include "td/mtproto/Handshake.h" + +#include "td/actor/actor.h" +#include "td/actor/PromiseFuture.h" +namespace td { +namespace mtproto { +ActorOwn<> create_ping_actor(std::string debug, unique_ptr raw_connection, + unique_ptr auth_data, + Promise> promise, ActorShared<> parent); +} +} // namespace td diff --git a/td/mtproto/PingConnection.cpp b/td/mtproto/PingConnection.cpp new file mode 100644 index 000000000..f3f1a2e7b --- /dev/null +++ b/td/mtproto/PingConnection.cpp @@ -0,0 +1,168 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/mtproto/PingConnection.h" +#include "td/mtproto/SessionConnection.h" +namespace td { +namespace mtproto { +namespace detail { +class PingConnectionReqPQ + : public PingConnection + , private RawConnection::Callback { + public: + PingConnectionReqPQ(unique_ptr raw_connection, size_t ping_count) + : raw_connection_(std::move(raw_connection)), ping_count_(ping_count) { + } + + PollableFdInfo &get_poll_info() override { + return raw_connection_->get_poll_info(); + } + + unique_ptr move_as_raw_connection() override { + return std::move(raw_connection_); + } + + Status flush() override { + if (!was_ping_) { + UInt128 nonce; + Random::secure_bytes(nonce.raw, sizeof(nonce)); + raw_connection_->send_no_crypto(PacketStorer(1, create_storer(mtproto_api::req_pq_multi(nonce)))); + was_ping_ = true; + if (ping_count_ == 1) { + start_time_ = Time::now(); + } + } + return raw_connection_->flush(AuthKey(), *this); + } + bool was_pong() const override { + return finish_time_ > 0; + } + double rtt() const override { + return finish_time_ - start_time_; + } + + Status on_raw_packet(const PacketInfo &packet_info, BufferSlice packet) override { + if (packet.size() < 12) { + return Status::Error("Result is too small"); + } + packet.confirm_read(12); + // TODO: fetch_result + + if (--ping_count_ > 0) { + was_ping_ = false; + return flush(); + } else { + finish_time_ = Time::now(); + return Status::OK(); + } + } + + private: + unique_ptr raw_connection_; + size_t ping_count_ = 1; + double start_time_ = 0.0; + double finish_time_ = 0.0; + bool was_ping_ = false; +}; + +class PingConnectionPingPong + : public PingConnection + , private SessionConnection::Callback { + public: + PingConnectionPingPong(unique_ptr raw_connection, unique_ptr auth_data) + : auth_data_(std::move(auth_data)) { + connection_ = + make_unique(SessionConnection::Mode::Tcp, std::move(raw_connection), auth_data_.get()); + } + + private: + unique_ptr auth_data_; + unique_ptr connection_; + bool was_pong_{false}; + bool is_closed_{false}; + Status status_; + void on_connected() override { + } + void on_before_close() override { + Scheduler::unsubscribe_before_close(connection_->get_poll_info().get_pollable_fd_ref()); + } + void on_closed(Status status) override { + is_closed_ = true; + status_ = std::move(status); + } + + void on_auth_key_updated() override { + } + void on_tmp_auth_key_updated() override { + } + void on_server_salt_updated() override { + } + void on_server_time_difference_updated() override { + } + + void on_session_created(uint64 unique_id, uint64 first_id) override { + } + void on_session_failed(Status status) override { + } + + void on_container_sent(uint64 container_id, vector msgs_id) override { + } + Status on_pong() override { + was_pong_ = true; + return Status::OK(); + } + + void on_message_ack(uint64 id) override { + } + Status on_message_result_ok(uint64 id, BufferSlice packet, size_t original_size) override { + LOG(ERROR) << "Unexpected message"; + return Status::OK(); + } + void on_message_result_error(uint64 id, int code, BufferSlice descr) override { + } + void on_message_failed(uint64 id, Status status) override { + } + void on_message_info(uint64 id, int32 state, uint64 answer_id, int32 answer_size) override { + } + + Status on_destroy_auth_key() override { + LOG(ERROR) << "Destroy auth key"; + return Status::OK(); + } + PollableFdInfo &get_poll_info() override { + return connection_->get_poll_info(); + } + unique_ptr move_as_raw_connection() override { + return connection_->move_as_raw_connection(); + } + Status flush() override { + if (was_pong_) { + return Status::OK(); + } + connection_->flush(this); + if (is_closed_) { + return std::move(status_); + } + return Status::OK(); + } + bool was_pong() const override { + return was_pong_; + } + double rtt() const override { + return 1; + } +}; + +} // namespace detail +unique_ptr PingConnection::create_req_pq(unique_ptr raw_connection, size_t ping_count) { + return make_unique(std::move(raw_connection), ping_count); +} +unique_ptr PingConnection::create_ping_pong(unique_ptr raw_connection, + unique_ptr auth_data) { + return make_unique(std::move(raw_connection), std::move(auth_data)); +} +} // namespace mtproto +} // namespace td diff --git a/td/mtproto/PingConnection.h b/td/mtproto/PingConnection.h index 5f2bab7c7..eddf32df0 100644 --- a/td/mtproto/PingConnection.h +++ b/td/mtproto/PingConnection.h @@ -7,6 +7,7 @@ #pragma once #include "td/mtproto/AuthKey.h" +#include "td/mtproto/AuthData.h" #include "td/mtproto/NoCryptoStorer.h" #include "td/mtproto/PacketInfo.h" #include "td/mtproto/PacketStorer.h" @@ -25,65 +26,18 @@ namespace td { namespace mtproto { -class PingConnection : private RawConnection::Callback { +class PingConnection { public: - PingConnection(unique_ptr raw_connection, size_t ping_count) - : raw_connection_(std::move(raw_connection)), ping_count_(ping_count) { - } + virtual ~PingConnection() = default; + virtual PollableFdInfo &get_poll_info() = 0; + virtual unique_ptr move_as_raw_connection() = 0; + virtual Status flush() = 0; + virtual bool was_pong() const = 0; + virtual double rtt() const = 0; - PollableFdInfo &get_poll_info() { - return raw_connection_->get_poll_info(); - } - - unique_ptr move_as_raw_connection() { - return std::move(raw_connection_); - } - - void close() { - raw_connection_->close(); - } - - Status flush() { - if (!was_ping_) { - UInt128 nonce; - Random::secure_bytes(nonce.raw, sizeof(nonce)); - raw_connection_->send_no_crypto(PacketStorer(1, create_storer(mtproto_api::req_pq_multi(nonce)))); - was_ping_ = true; - if (ping_count_ == 1) { - start_time_ = Time::now(); - } - } - return raw_connection_->flush(AuthKey(), *this); - } - bool was_pong() const { - return finish_time_ > 0; - } - double rtt() const { - return finish_time_ - start_time_; - } - - Status on_raw_packet(const PacketInfo &packet_info, BufferSlice packet) override { - if (packet.size() < 12) { - return Status::Error("Result is too small"); - } - packet.confirm_read(12); - // TODO: fetch_result - - if (--ping_count_ > 0) { - was_ping_ = false; - return flush(); - } else { - finish_time_ = Time::now(); - return Status::OK(); - } - } - - private: - unique_ptr raw_connection_; - size_t ping_count_ = 1; - double start_time_ = 0.0; - double finish_time_ = 0.0; - bool was_ping_ = false; + static unique_ptr create_req_pq(unique_ptr raw_connection, size_t ping_count); + static unique_ptr create_ping_pong(unique_ptr raw_connection, + unique_ptr auth_data); }; } // namespace mtproto diff --git a/td/mtproto/SessionConnection.cpp b/td/mtproto/SessionConnection.cpp index 7bc85ccba..f59467758 100644 --- a/td/mtproto/SessionConnection.cpp +++ b/td/mtproto/SessionConnection.cpp @@ -182,6 +182,10 @@ class OnPacket { } }; +unique_ptr SessionConnection::move_as_raw_connection() { + return std::move(raw_connection_); +} + /*** SessionConnection ***/ BufferSlice SessionConnection::as_buffer_slice(Slice packet) { return current_buffer_slice_->from_slice(packet); @@ -978,6 +982,7 @@ void SessionConnection::send_before(double tm) { } Status SessionConnection::do_flush() { + CHECK(raw_connection_); CHECK(state_ != Closed); if (state_ == Init) { TRY_STATUS(init()); diff --git a/td/mtproto/SessionConnection.h b/td/mtproto/SessionConnection.h index 635c2fa54..dcb9e7cc1 100644 --- a/td/mtproto/SessionConnection.h +++ b/td/mtproto/SessionConnection.h @@ -68,6 +68,7 @@ class SessionConnection SessionConnection(Mode mode, unique_ptr raw_connection, AuthData *auth_data); PollableFdInfo &get_poll_info(); + unique_ptr move_as_raw_connection(); // Interface Result TD_WARN_UNUSED_RESULT send_query(BufferSlice buffer, bool gzip_flag, int64 message_id = 0, diff --git a/td/telegram/net/ConnectionCreator.cpp b/td/telegram/net/ConnectionCreator.cpp index 74ff10624..fe5ded4e7 100644 --- a/td/telegram/net/ConnectionCreator.cpp +++ b/td/telegram/net/ConnectionCreator.cpp @@ -19,7 +19,7 @@ #include "td/telegram/StateManager.h" #include "td/telegram/TdDb.h" -#include "td/mtproto/PingConnection.h" +#include "td/mtproto/Ping.h" #include "td/mtproto/RawConnection.h" #include "td/net/GetHostByNameActor.h" @@ -86,81 +86,6 @@ class StatsCallback final : public mtproto::RawConnection::StatsCallback { DcOptionsSet::Stat *option_stat_; }; -class PingActor : public Actor { - public: - PingActor(unique_ptr raw_connection, Promise> promise, - ActorShared<> parent) - : promise_(std::move(promise)), parent_(std::move(parent)) { - ping_connection_ = make_unique(std::move(raw_connection), 2); - } - - private: - unique_ptr ping_connection_; - Promise> promise_; - ActorShared<> parent_; - - void start_up() override { - Scheduler::subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this)); - set_timeout_in(10); - yield(); - } - - void hangup() override { - finish(Status::Error("Cancelled")); - stop(); - } - - void tear_down() override { - finish(Status::OK()); - } - - void loop() override { - auto status = ping_connection_->flush(); - if (status.is_error()) { - finish(std::move(status)); - return stop(); - } - if (ping_connection_->was_pong()) { - finish(Status::OK()); - return stop(); - } - } - - void timeout_expired() override { - finish(Status::Error("Pong timeout expired")); - stop(); - } - - void finish(Status status) { - auto raw_connection = ping_connection_->move_as_raw_connection(); - if (!raw_connection) { - CHECK(!promise_); - return; - } - Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref()); - if (promise_) { - if (status.is_error()) { - if (raw_connection->stats_callback()) { - raw_connection->stats_callback()->on_error(); - } - raw_connection->close(); - promise_.set_error(std::move(status)); - } else { - raw_connection->rtt_ = ping_connection_->rtt(); - if (raw_connection->stats_callback()) { - raw_connection->stats_callback()->on_pong(); - } - promise_.set_value(std::move(raw_connection)); - } - } else { - if (raw_connection->stats_callback()) { - raw_connection->stats_callback()->on_error(); - } - raw_connection->close(); - } - } -}; - } // namespace detail class ConnectionCreator::ProxyInfo { @@ -602,17 +527,17 @@ void ConnectionCreator::ping_proxy_socket_fd(SocketFd socket_fd, mtproto::Transp Promise promise) { auto token = next_token(); auto raw_connection = make_unique(std::move(socket_fd), std::move(transport_type), nullptr); - children_[token] = {false, create_actor( - "PingActor", std::move(raw_connection), - PromiseCreator::lambda([promise = std::move(promise)]( - Result> result) mutable { - if (result.is_error()) { - return promise.set_error(Status::Error(400, result.error().message())); - } - auto ping_time = result.ok()->rtt_; - promise.set_value(std::move(ping_time)); - }), - create_reference(token))}; + children_[token] = { + false, create_ping_actor("", std::move(raw_connection), nullptr, + PromiseCreator::lambda([promise = std::move(promise)]( + Result> result) mutable { + if (result.is_error()) { + return promise.set_error(Status::Error(400, result.error().message())); + } + auto ping_time = result.ok()->rtt_; + promise.set_value(std::move(ping_time)); + }), + create_reference(token))}; } void ConnectionCreator::set_active_proxy_id(int32 proxy_id, bool from_binlog) { @@ -805,7 +730,8 @@ void ConnectionCreator::on_mtproto_error(size_t hash) { } void ConnectionCreator::request_raw_connection(DcId dc_id, bool allow_media_only, bool is_media, - Promise> promise, size_t hash) { + Promise> promise, size_t hash, + unique_ptr auth_data) { auto &client = clients_[hash]; if (!client.inited) { client.inited = true; @@ -813,6 +739,7 @@ void ConnectionCreator::request_raw_connection(DcId dc_id, bool allow_media_only client.dc_id = dc_id; client.allow_media_only = allow_media_only; client.is_media = is_media; + client.auth_data = std::move(auth_data); } else { CHECK(client.hash == hash); CHECK(client.dc_id == dc_id); @@ -1111,9 +1038,8 @@ void ConnectionCreator::client_create_raw_connection(Result r_co if (check_mode) { VLOG(connections) << "Start check: " << debug_str; auto token = next_token(); - children_[token] = { - true, create_actor(PSLICE() << "PingActor<" << debug_str << ">", std::move(raw_connection), - std::move(promise), create_reference(token))}; + children_[token] = {true, create_ping_actor(debug_str, std::move(raw_connection), nullptr, std::move(promise), + create_reference(token))}; } else { promise.set_value(std::move(raw_connection)); } diff --git a/td/telegram/net/ConnectionCreator.h b/td/telegram/net/ConnectionCreator.h index 2e01fe2b4..a77443556 100644 --- a/td/telegram/net/ConnectionCreator.h +++ b/td/telegram/net/ConnectionCreator.h @@ -15,6 +15,7 @@ #include "td/telegram/net/NetQuery.h" #include "td/telegram/StateManager.h" +#include "td/mtproto/AuthData.h" #include "td/mtproto/TransportType.h" #include "td/actor/actor.h" @@ -156,7 +157,8 @@ class ConnectionCreator : public NetQueryCallback { void on_pong(size_t hash); void on_mtproto_error(size_t hash); void request_raw_connection(DcId dc_id, bool allow_media_only, bool is_media, - Promise> promise, size_t hash = 0); + Promise> promise, size_t hash = 0, + unique_ptr auth_data = {}); void request_raw_connection_by_ip(IPAddress ip_address, Promise> promise); void set_net_stats_callback(std::shared_ptr common_callback, @@ -237,6 +239,7 @@ class ConnectionCreator : public NetQueryCallback { DcId dc_id; bool allow_media_only; bool is_media; + unique_ptr auth_data; }; std::map clients_; diff --git a/td/telegram/net/SessionProxy.cpp b/td/telegram/net/SessionProxy.cpp index 862e26436..cf23d32b9 100644 --- a/td/telegram/net/SessionProxy.cpp +++ b/td/telegram/net/SessionProxy.cpp @@ -45,7 +45,7 @@ class SessionCallback : public Session::Callback { } void request_raw_connection(Promise> promise) override { send_closure(G()->connection_creator(), &ConnectionCreator::request_raw_connection, dc_id_, allow_media_only_, - is_media_, std::move(promise), hash_); + is_media_, std::move(promise), hash_, nullptr); } void on_tmp_auth_key_updated(mtproto::AuthKey auth_key) override { diff --git a/test/mtproto.cpp b/test/mtproto.cpp index ef6384ef8..4e4970e77 100644 --- a/test/mtproto.cpp +++ b/test/mtproto.cpp @@ -14,6 +14,7 @@ #include "td/mtproto/Handshake.h" #include "td/mtproto/HandshakeActor.h" #include "td/mtproto/PingConnection.h" +#include "td/mtproto/Ping.h" #include "td/mtproto/RawConnection.h" #include "td/mtproto/TransportType.h" @@ -22,6 +23,7 @@ #include "td/net/TransparentProxy.h" #include "td/telegram/ConfigManager.h" +#include "td/telegram/net/Session.h" #include "td/telegram/net/DcId.h" #include "td/telegram/net/PublicRsaKeyShared.h" #include "td/telegram/NotificationManager.h" @@ -170,7 +172,7 @@ class TestPingActor : public Actor { Status *result_; void start_up() override { - ping_connection_ = make_unique( + ping_connection_ = mtproto::PingConnection::create_req_pq( make_unique(SocketFd::open(ip_address_).move_as_ok(), mtproto::TransportType{mtproto::TransportType::Tcp, 0, ""}, nullptr), 3); @@ -181,7 +183,6 @@ class TestPingActor : public Actor { } void tear_down() override { Scheduler::unsubscribe_before_close(ping_connection_->get_poll_info().get_pollable_fd_ref()); - ping_connection_->close(); Scheduler::instance()->finish(); } @@ -463,3 +464,122 @@ TEST(Mtproto, notifications) { ASSERT_EQ(decrypted_payload, NotificationManager::decrypt_push(key_id, key, push).ok()); } } + +class FastPingTestActor : public Actor { + public: + explicit FastPingTestActor(Status *result) : result_(result) { + } + + private: + Status *result_; + unique_ptr connection_; + unique_ptr handshake_; + ActorOwn<> fast_ping_; + int iteration_{0}; + + void start_up() override { + // Run handshake to create key and salt + auto raw_connection = + make_unique(SocketFd::open(get_default_ip_address()).move_as_ok(), + mtproto::TransportType{mtproto::TransportType::Tcp, 0, ""}, nullptr); + auto handshake = make_unique(get_default_dc_id(), 60 * 100 /*temp*/); + create_actor( + "HandshakeActor", std::move(handshake), std::move(raw_connection), make_unique(), 10.0, + PromiseCreator::lambda([self = actor_id(this)](Result> raw_connection) { + send_closure(self, &FastPingTestActor::got_connection, std::move(raw_connection), 1); + }), + PromiseCreator::lambda([self = actor_id(this)](Result> handshake) { + send_closure(self, &FastPingTestActor::got_handshake, std::move(handshake), 1); + })) + .release(); + } + void got_connection(Result> r_raw_connection, int32 dummy) { + if (r_raw_connection.is_error()) { + *result_ = r_raw_connection.move_as_error(); + return stop(); + } + connection_ = r_raw_connection.move_as_ok(); + loop(); + } + + void got_handshake(Result> r_handshake, int32 dummy) { + if (r_handshake.is_error()) { + *result_ = r_handshake.move_as_error(); + return stop(); + } + handshake_ = r_handshake.move_as_ok(); + loop(); + } + + void got_raw_connection(Result> r_connection) { + if (r_connection.is_error()) { + Scheduler::instance()->finish(); + *result_ = r_connection.move_as_error(); + return stop(); + } + connection_ = r_connection.move_as_ok(); + LOG(ERROR) << "RTT: " << connection_->rtt_; + connection_->rtt_ = 0; + loop(); + } + + void loop() override { + if (handshake_ && connection_) { + LOG(ERROR) << iteration_; + if (iteration_ == 6) { + Scheduler::instance()->finish(); + return stop(); + } + unique_ptr auth_data; + if (iteration_ % 2 == 0) { + auth_data = make_unique(); + auth_data->set_tmp_auth_key(handshake_->auth_key); + auth_data->set_server_time_difference(handshake_->server_time_diff); + auth_data->set_server_salt(handshake_->server_salt, Time::now()); + auth_data->set_future_salts({mtproto::ServerSalt{0u, 1e20, 1e30}}, Time::now()); + auth_data->set_use_pfs(true); + uint64 session_id = 0; + do { + Random::secure_bytes(reinterpret_cast(&session_id), sizeof(session_id)); + } while (session_id == 0); + auth_data->set_session_id(session_id); + } + iteration_++; + fast_ping_ = create_ping_actor( + "", std::move(connection_), std::move(auth_data), + PromiseCreator::lambda([self = actor_id(this)](Result> r_raw_connection) { + send_closure(self, &FastPingTestActor::got_raw_connection, std::move(r_raw_connection)); + }), + ActorShared<>()); + } + } +}; + +class Mtproto_FastPing : public Test { + public: + using Test::Test; + bool step() final { + if (!is_inited_) { + sched_.init(0); + sched_.create_actor_unsafe(0, "FastPingTestActor", &result_).release(); + sched_.start(); + is_inited_ = true; + } + + bool ret = sched_.run_main(10); + if (ret) { + return true; + } + sched_.finish(); + if (result_.is_error()) { + LOG(ERROR) << result_; + } + return false; + } + + private: + bool is_inited_ = false; + ConcurrentScheduler sched_; + Status result_; +}; +RegisterTest mtproto_fastping("Mtproto_FastPing");