mtproto::Ping two variants with mtproto_api::ping and mtproto_api::req_pq
GitOrigin-RevId: 196d7af132a791615c867cbdbfa23a2fa48327e9
This commit is contained in:
parent
1aa86c15be
commit
be006f6cb4
|
@ -343,6 +343,8 @@ set(TDLIB_SOURCE
|
||||||
td/mtproto/HandshakeActor.cpp
|
td/mtproto/HandshakeActor.cpp
|
||||||
td/mtproto/HttpTransport.cpp
|
td/mtproto/HttpTransport.cpp
|
||||||
td/mtproto/IStreamTransport.cpp
|
td/mtproto/IStreamTransport.cpp
|
||||||
|
td/mtproto/Ping.cpp
|
||||||
|
td/mtproto/PingConnection.cpp
|
||||||
td/mtproto/RawConnection.cpp
|
td/mtproto/RawConnection.cpp
|
||||||
td/mtproto/SessionConnection.cpp
|
td/mtproto/SessionConnection.cpp
|
||||||
td/mtproto/TcpTransport.cpp
|
td/mtproto/TcpTransport.cpp
|
||||||
|
@ -467,6 +469,7 @@ set(TDLIB_SOURCE
|
||||||
td/mtproto/NoCryptoStorer.h
|
td/mtproto/NoCryptoStorer.h
|
||||||
td/mtproto/PacketInfo.h
|
td/mtproto/PacketInfo.h
|
||||||
td/mtproto/PacketStorer.h
|
td/mtproto/PacketStorer.h
|
||||||
|
td/mtproto/Ping.h
|
||||||
td/mtproto/PingConnection.h
|
td/mtproto/PingConnection.h
|
||||||
td/mtproto/Query.h
|
td/mtproto/Query.h
|
||||||
td/mtproto/RawConnection.h
|
td/mtproto/RawConnection.h
|
||||||
|
|
101
td/mtproto/Ping.cpp
Normal file
101
td/mtproto/Ping.cpp
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
//
|
||||||
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019
|
||||||
|
//
|
||||||
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||||
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||||
|
//
|
||||||
|
#include "td/mtproto/Ping.h"
|
||||||
|
|
||||||
|
#include "td/mtproto/AuthData.h"
|
||||||
|
#include "td/mtproto/PingConnection.h"
|
||||||
|
|
||||||
|
#include "td/utils/crypto.h"
|
||||||
|
#include "td/utils/Random.h"
|
||||||
|
namespace td {
|
||||||
|
namespace mtproto {
|
||||||
|
ActorOwn<> create_ping_actor(std::string debug, unique_ptr<mtproto::RawConnection> raw_connection,
|
||||||
|
unique_ptr<mtproto::AuthData> auth_data,
|
||||||
|
Promise<unique_ptr<mtproto::RawConnection>> promise, ActorShared<> parent) {
|
||||||
|
class PingActor : public Actor {
|
||||||
|
public:
|
||||||
|
PingActor(unique_ptr<mtproto::RawConnection> raw_connection, unique_ptr<mtproto::AuthData> auth_data,
|
||||||
|
Promise<unique_ptr<mtproto::RawConnection>> promise, ActorShared<> parent)
|
||||||
|
: promise_(std::move(promise)), parent_(std::move(parent)) {
|
||||||
|
if (auth_data) {
|
||||||
|
ping_connection_ = mtproto::PingConnection::create_ping_pong(std::move(raw_connection), std::move(auth_data));
|
||||||
|
} else {
|
||||||
|
ping_connection_ = mtproto::PingConnection::create_req_pq(std::move(raw_connection), 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
unique_ptr<mtproto::PingConnection> ping_connection_;
|
||||||
|
Promise<unique_ptr<mtproto::RawConnection>> promise_;
|
||||||
|
ActorShared<> parent_;
|
||||||
|
|
||||||
|
void start_up() override {
|
||||||
|
Scheduler::subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this));
|
||||||
|
set_timeout_in(10);
|
||||||
|
yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
void hangup() override {
|
||||||
|
finish(Status::Error("Cancelled"));
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
void tear_down() override {
|
||||||
|
finish(Status::OK());
|
||||||
|
}
|
||||||
|
|
||||||
|
void loop() override {
|
||||||
|
auto status = ping_connection_->flush();
|
||||||
|
if (status.is_error()) {
|
||||||
|
finish(std::move(status));
|
||||||
|
return stop();
|
||||||
|
}
|
||||||
|
if (ping_connection_->was_pong()) {
|
||||||
|
finish(Status::OK());
|
||||||
|
return stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void timeout_expired() override {
|
||||||
|
finish(Status::Error("Pong timeout expired"));
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
void finish(Status status) {
|
||||||
|
auto raw_connection = ping_connection_->move_as_raw_connection();
|
||||||
|
if (!raw_connection) {
|
||||||
|
CHECK(!promise_);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref());
|
||||||
|
if (promise_) {
|
||||||
|
if (status.is_error()) {
|
||||||
|
if (raw_connection->stats_callback()) {
|
||||||
|
raw_connection->stats_callback()->on_error();
|
||||||
|
}
|
||||||
|
raw_connection->close();
|
||||||
|
promise_.set_error(std::move(status));
|
||||||
|
} else {
|
||||||
|
raw_connection->rtt_ = ping_connection_->rtt();
|
||||||
|
if (raw_connection->stats_callback()) {
|
||||||
|
raw_connection->stats_callback()->on_pong();
|
||||||
|
}
|
||||||
|
promise_.set_value(std::move(raw_connection));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (raw_connection->stats_callback()) {
|
||||||
|
raw_connection->stats_callback()->on_error();
|
||||||
|
}
|
||||||
|
raw_connection->close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return ActorOwn<>(create_actor<PingActor>(PSLICE() << "PingActor<" << debug << ">", std::move(raw_connection),
|
||||||
|
std::move(auth_data), std::move(promise), std::move(parent)));
|
||||||
|
}
|
||||||
|
} // namespace mtproto
|
||||||
|
} // namespace td
|
20
td/mtproto/Ping.h
Normal file
20
td/mtproto/Ping.h
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
//
|
||||||
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019
|
||||||
|
//
|
||||||
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||||
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||||
|
//
|
||||||
|
#pragma once
|
||||||
|
#include "td/mtproto/RawConnection.h"
|
||||||
|
#include "td/mtproto/SessionConnection.h"
|
||||||
|
#include "td/mtproto/Handshake.h"
|
||||||
|
|
||||||
|
#include "td/actor/actor.h"
|
||||||
|
#include "td/actor/PromiseFuture.h"
|
||||||
|
namespace td {
|
||||||
|
namespace mtproto {
|
||||||
|
ActorOwn<> create_ping_actor(std::string debug, unique_ptr<mtproto::RawConnection> raw_connection,
|
||||||
|
unique_ptr<mtproto::AuthData> auth_data,
|
||||||
|
Promise<unique_ptr<mtproto::RawConnection>> promise, ActorShared<> parent);
|
||||||
|
}
|
||||||
|
} // namespace td
|
168
td/mtproto/PingConnection.cpp
Normal file
168
td/mtproto/PingConnection.cpp
Normal file
|
@ -0,0 +1,168 @@
|
||||||
|
//
|
||||||
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019
|
||||||
|
//
|
||||||
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||||
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||||
|
//
|
||||||
|
#include "td/mtproto/PingConnection.h"
|
||||||
|
#include "td/mtproto/SessionConnection.h"
|
||||||
|
namespace td {
|
||||||
|
namespace mtproto {
|
||||||
|
namespace detail {
|
||||||
|
class PingConnectionReqPQ
|
||||||
|
: public PingConnection
|
||||||
|
, private RawConnection::Callback {
|
||||||
|
public:
|
||||||
|
PingConnectionReqPQ(unique_ptr<RawConnection> raw_connection, size_t ping_count)
|
||||||
|
: raw_connection_(std::move(raw_connection)), ping_count_(ping_count) {
|
||||||
|
}
|
||||||
|
|
||||||
|
PollableFdInfo &get_poll_info() override {
|
||||||
|
return raw_connection_->get_poll_info();
|
||||||
|
}
|
||||||
|
|
||||||
|
unique_ptr<RawConnection> move_as_raw_connection() override {
|
||||||
|
return std::move(raw_connection_);
|
||||||
|
}
|
||||||
|
|
||||||
|
Status flush() override {
|
||||||
|
if (!was_ping_) {
|
||||||
|
UInt128 nonce;
|
||||||
|
Random::secure_bytes(nonce.raw, sizeof(nonce));
|
||||||
|
raw_connection_->send_no_crypto(PacketStorer<NoCryptoImpl>(1, create_storer(mtproto_api::req_pq_multi(nonce))));
|
||||||
|
was_ping_ = true;
|
||||||
|
if (ping_count_ == 1) {
|
||||||
|
start_time_ = Time::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return raw_connection_->flush(AuthKey(), *this);
|
||||||
|
}
|
||||||
|
bool was_pong() const override {
|
||||||
|
return finish_time_ > 0;
|
||||||
|
}
|
||||||
|
double rtt() const override {
|
||||||
|
return finish_time_ - start_time_;
|
||||||
|
}
|
||||||
|
|
||||||
|
Status on_raw_packet(const PacketInfo &packet_info, BufferSlice packet) override {
|
||||||
|
if (packet.size() < 12) {
|
||||||
|
return Status::Error("Result is too small");
|
||||||
|
}
|
||||||
|
packet.confirm_read(12);
|
||||||
|
// TODO: fetch_result
|
||||||
|
|
||||||
|
if (--ping_count_ > 0) {
|
||||||
|
was_ping_ = false;
|
||||||
|
return flush();
|
||||||
|
} else {
|
||||||
|
finish_time_ = Time::now();
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
unique_ptr<RawConnection> raw_connection_;
|
||||||
|
size_t ping_count_ = 1;
|
||||||
|
double start_time_ = 0.0;
|
||||||
|
double finish_time_ = 0.0;
|
||||||
|
bool was_ping_ = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
class PingConnectionPingPong
|
||||||
|
: public PingConnection
|
||||||
|
, private SessionConnection::Callback {
|
||||||
|
public:
|
||||||
|
PingConnectionPingPong(unique_ptr<mtproto::RawConnection> raw_connection, unique_ptr<mtproto::AuthData> auth_data)
|
||||||
|
: auth_data_(std::move(auth_data)) {
|
||||||
|
connection_ =
|
||||||
|
make_unique<SessionConnection>(SessionConnection::Mode::Tcp, std::move(raw_connection), auth_data_.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
unique_ptr<mtproto::AuthData> auth_data_;
|
||||||
|
unique_ptr<mtproto::SessionConnection> connection_;
|
||||||
|
bool was_pong_{false};
|
||||||
|
bool is_closed_{false};
|
||||||
|
Status status_;
|
||||||
|
void on_connected() override {
|
||||||
|
}
|
||||||
|
void on_before_close() override {
|
||||||
|
Scheduler::unsubscribe_before_close(connection_->get_poll_info().get_pollable_fd_ref());
|
||||||
|
}
|
||||||
|
void on_closed(Status status) override {
|
||||||
|
is_closed_ = true;
|
||||||
|
status_ = std::move(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
void on_auth_key_updated() override {
|
||||||
|
}
|
||||||
|
void on_tmp_auth_key_updated() override {
|
||||||
|
}
|
||||||
|
void on_server_salt_updated() override {
|
||||||
|
}
|
||||||
|
void on_server_time_difference_updated() override {
|
||||||
|
}
|
||||||
|
|
||||||
|
void on_session_created(uint64 unique_id, uint64 first_id) override {
|
||||||
|
}
|
||||||
|
void on_session_failed(Status status) override {
|
||||||
|
}
|
||||||
|
|
||||||
|
void on_container_sent(uint64 container_id, vector<uint64> msgs_id) override {
|
||||||
|
}
|
||||||
|
Status on_pong() override {
|
||||||
|
was_pong_ = true;
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
void on_message_ack(uint64 id) override {
|
||||||
|
}
|
||||||
|
Status on_message_result_ok(uint64 id, BufferSlice packet, size_t original_size) override {
|
||||||
|
LOG(ERROR) << "Unexpected message";
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
void on_message_result_error(uint64 id, int code, BufferSlice descr) override {
|
||||||
|
}
|
||||||
|
void on_message_failed(uint64 id, Status status) override {
|
||||||
|
}
|
||||||
|
void on_message_info(uint64 id, int32 state, uint64 answer_id, int32 answer_size) override {
|
||||||
|
}
|
||||||
|
|
||||||
|
Status on_destroy_auth_key() override {
|
||||||
|
LOG(ERROR) << "Destroy auth key";
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
PollableFdInfo &get_poll_info() override {
|
||||||
|
return connection_->get_poll_info();
|
||||||
|
}
|
||||||
|
unique_ptr<RawConnection> move_as_raw_connection() override {
|
||||||
|
return connection_->move_as_raw_connection();
|
||||||
|
}
|
||||||
|
Status flush() override {
|
||||||
|
if (was_pong_) {
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
connection_->flush(this);
|
||||||
|
if (is_closed_) {
|
||||||
|
return std::move(status_);
|
||||||
|
}
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
bool was_pong() const override {
|
||||||
|
return was_pong_;
|
||||||
|
}
|
||||||
|
double rtt() const override {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace detail
|
||||||
|
unique_ptr<PingConnection> PingConnection::create_req_pq(unique_ptr<RawConnection> raw_connection, size_t ping_count) {
|
||||||
|
return make_unique<detail::PingConnectionReqPQ>(std::move(raw_connection), ping_count);
|
||||||
|
}
|
||||||
|
unique_ptr<PingConnection> PingConnection::create_ping_pong(unique_ptr<RawConnection> raw_connection,
|
||||||
|
unique_ptr<AuthData> auth_data) {
|
||||||
|
return make_unique<detail::PingConnectionPingPong>(std::move(raw_connection), std::move(auth_data));
|
||||||
|
}
|
||||||
|
} // namespace mtproto
|
||||||
|
} // namespace td
|
|
@ -7,6 +7,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "td/mtproto/AuthKey.h"
|
#include "td/mtproto/AuthKey.h"
|
||||||
|
#include "td/mtproto/AuthData.h"
|
||||||
#include "td/mtproto/NoCryptoStorer.h"
|
#include "td/mtproto/NoCryptoStorer.h"
|
||||||
#include "td/mtproto/PacketInfo.h"
|
#include "td/mtproto/PacketInfo.h"
|
||||||
#include "td/mtproto/PacketStorer.h"
|
#include "td/mtproto/PacketStorer.h"
|
||||||
|
@ -25,65 +26,18 @@
|
||||||
namespace td {
|
namespace td {
|
||||||
namespace mtproto {
|
namespace mtproto {
|
||||||
|
|
||||||
class PingConnection : private RawConnection::Callback {
|
class PingConnection {
|
||||||
public:
|
public:
|
||||||
PingConnection(unique_ptr<RawConnection> raw_connection, size_t ping_count)
|
virtual ~PingConnection() = default;
|
||||||
: raw_connection_(std::move(raw_connection)), ping_count_(ping_count) {
|
virtual PollableFdInfo &get_poll_info() = 0;
|
||||||
}
|
virtual unique_ptr<RawConnection> move_as_raw_connection() = 0;
|
||||||
|
virtual Status flush() = 0;
|
||||||
|
virtual bool was_pong() const = 0;
|
||||||
|
virtual double rtt() const = 0;
|
||||||
|
|
||||||
PollableFdInfo &get_poll_info() {
|
static unique_ptr<PingConnection> create_req_pq(unique_ptr<RawConnection> raw_connection, size_t ping_count);
|
||||||
return raw_connection_->get_poll_info();
|
static unique_ptr<PingConnection> create_ping_pong(unique_ptr<RawConnection> raw_connection,
|
||||||
}
|
unique_ptr<AuthData> auth_data);
|
||||||
|
|
||||||
unique_ptr<RawConnection> move_as_raw_connection() {
|
|
||||||
return std::move(raw_connection_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void close() {
|
|
||||||
raw_connection_->close();
|
|
||||||
}
|
|
||||||
|
|
||||||
Status flush() {
|
|
||||||
if (!was_ping_) {
|
|
||||||
UInt128 nonce;
|
|
||||||
Random::secure_bytes(nonce.raw, sizeof(nonce));
|
|
||||||
raw_connection_->send_no_crypto(PacketStorer<NoCryptoImpl>(1, create_storer(mtproto_api::req_pq_multi(nonce))));
|
|
||||||
was_ping_ = true;
|
|
||||||
if (ping_count_ == 1) {
|
|
||||||
start_time_ = Time::now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return raw_connection_->flush(AuthKey(), *this);
|
|
||||||
}
|
|
||||||
bool was_pong() const {
|
|
||||||
return finish_time_ > 0;
|
|
||||||
}
|
|
||||||
double rtt() const {
|
|
||||||
return finish_time_ - start_time_;
|
|
||||||
}
|
|
||||||
|
|
||||||
Status on_raw_packet(const PacketInfo &packet_info, BufferSlice packet) override {
|
|
||||||
if (packet.size() < 12) {
|
|
||||||
return Status::Error("Result is too small");
|
|
||||||
}
|
|
||||||
packet.confirm_read(12);
|
|
||||||
// TODO: fetch_result
|
|
||||||
|
|
||||||
if (--ping_count_ > 0) {
|
|
||||||
was_ping_ = false;
|
|
||||||
return flush();
|
|
||||||
} else {
|
|
||||||
finish_time_ = Time::now();
|
|
||||||
return Status::OK();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
unique_ptr<RawConnection> raw_connection_;
|
|
||||||
size_t ping_count_ = 1;
|
|
||||||
double start_time_ = 0.0;
|
|
||||||
double finish_time_ = 0.0;
|
|
||||||
bool was_ping_ = false;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace mtproto
|
} // namespace mtproto
|
||||||
|
|
|
@ -182,6 +182,10 @@ class OnPacket {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
unique_ptr<RawConnection> SessionConnection::move_as_raw_connection() {
|
||||||
|
return std::move(raw_connection_);
|
||||||
|
}
|
||||||
|
|
||||||
/*** SessionConnection ***/
|
/*** SessionConnection ***/
|
||||||
BufferSlice SessionConnection::as_buffer_slice(Slice packet) {
|
BufferSlice SessionConnection::as_buffer_slice(Slice packet) {
|
||||||
return current_buffer_slice_->from_slice(packet);
|
return current_buffer_slice_->from_slice(packet);
|
||||||
|
@ -978,6 +982,7 @@ void SessionConnection::send_before(double tm) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status SessionConnection::do_flush() {
|
Status SessionConnection::do_flush() {
|
||||||
|
CHECK(raw_connection_);
|
||||||
CHECK(state_ != Closed);
|
CHECK(state_ != Closed);
|
||||||
if (state_ == Init) {
|
if (state_ == Init) {
|
||||||
TRY_STATUS(init());
|
TRY_STATUS(init());
|
||||||
|
|
|
@ -68,6 +68,7 @@ class SessionConnection
|
||||||
SessionConnection(Mode mode, unique_ptr<RawConnection> raw_connection, AuthData *auth_data);
|
SessionConnection(Mode mode, unique_ptr<RawConnection> raw_connection, AuthData *auth_data);
|
||||||
|
|
||||||
PollableFdInfo &get_poll_info();
|
PollableFdInfo &get_poll_info();
|
||||||
|
unique_ptr<RawConnection> move_as_raw_connection();
|
||||||
|
|
||||||
// Interface
|
// Interface
|
||||||
Result<uint64> TD_WARN_UNUSED_RESULT send_query(BufferSlice buffer, bool gzip_flag, int64 message_id = 0,
|
Result<uint64> TD_WARN_UNUSED_RESULT send_query(BufferSlice buffer, bool gzip_flag, int64 message_id = 0,
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
#include "td/telegram/StateManager.h"
|
#include "td/telegram/StateManager.h"
|
||||||
#include "td/telegram/TdDb.h"
|
#include "td/telegram/TdDb.h"
|
||||||
|
|
||||||
#include "td/mtproto/PingConnection.h"
|
#include "td/mtproto/Ping.h"
|
||||||
#include "td/mtproto/RawConnection.h"
|
#include "td/mtproto/RawConnection.h"
|
||||||
|
|
||||||
#include "td/net/GetHostByNameActor.h"
|
#include "td/net/GetHostByNameActor.h"
|
||||||
|
@ -86,81 +86,6 @@ class StatsCallback final : public mtproto::RawConnection::StatsCallback {
|
||||||
DcOptionsSet::Stat *option_stat_;
|
DcOptionsSet::Stat *option_stat_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class PingActor : public Actor {
|
|
||||||
public:
|
|
||||||
PingActor(unique_ptr<mtproto::RawConnection> raw_connection, Promise<unique_ptr<mtproto::RawConnection>> promise,
|
|
||||||
ActorShared<> parent)
|
|
||||||
: promise_(std::move(promise)), parent_(std::move(parent)) {
|
|
||||||
ping_connection_ = make_unique<mtproto::PingConnection>(std::move(raw_connection), 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
unique_ptr<mtproto::PingConnection> ping_connection_;
|
|
||||||
Promise<unique_ptr<mtproto::RawConnection>> promise_;
|
|
||||||
ActorShared<> parent_;
|
|
||||||
|
|
||||||
void start_up() override {
|
|
||||||
Scheduler::subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this));
|
|
||||||
set_timeout_in(10);
|
|
||||||
yield();
|
|
||||||
}
|
|
||||||
|
|
||||||
void hangup() override {
|
|
||||||
finish(Status::Error("Cancelled"));
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
void tear_down() override {
|
|
||||||
finish(Status::OK());
|
|
||||||
}
|
|
||||||
|
|
||||||
void loop() override {
|
|
||||||
auto status = ping_connection_->flush();
|
|
||||||
if (status.is_error()) {
|
|
||||||
finish(std::move(status));
|
|
||||||
return stop();
|
|
||||||
}
|
|
||||||
if (ping_connection_->was_pong()) {
|
|
||||||
finish(Status::OK());
|
|
||||||
return stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void timeout_expired() override {
|
|
||||||
finish(Status::Error("Pong timeout expired"));
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
void finish(Status status) {
|
|
||||||
auto raw_connection = ping_connection_->move_as_raw_connection();
|
|
||||||
if (!raw_connection) {
|
|
||||||
CHECK(!promise_);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref());
|
|
||||||
if (promise_) {
|
|
||||||
if (status.is_error()) {
|
|
||||||
if (raw_connection->stats_callback()) {
|
|
||||||
raw_connection->stats_callback()->on_error();
|
|
||||||
}
|
|
||||||
raw_connection->close();
|
|
||||||
promise_.set_error(std::move(status));
|
|
||||||
} else {
|
|
||||||
raw_connection->rtt_ = ping_connection_->rtt();
|
|
||||||
if (raw_connection->stats_callback()) {
|
|
||||||
raw_connection->stats_callback()->on_pong();
|
|
||||||
}
|
|
||||||
promise_.set_value(std::move(raw_connection));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (raw_connection->stats_callback()) {
|
|
||||||
raw_connection->stats_callback()->on_error();
|
|
||||||
}
|
|
||||||
raw_connection->close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
|
|
||||||
class ConnectionCreator::ProxyInfo {
|
class ConnectionCreator::ProxyInfo {
|
||||||
|
@ -602,17 +527,17 @@ void ConnectionCreator::ping_proxy_socket_fd(SocketFd socket_fd, mtproto::Transp
|
||||||
Promise<double> promise) {
|
Promise<double> promise) {
|
||||||
auto token = next_token();
|
auto token = next_token();
|
||||||
auto raw_connection = make_unique<mtproto::RawConnection>(std::move(socket_fd), std::move(transport_type), nullptr);
|
auto raw_connection = make_unique<mtproto::RawConnection>(std::move(socket_fd), std::move(transport_type), nullptr);
|
||||||
children_[token] = {false, create_actor<detail::PingActor>(
|
children_[token] = {
|
||||||
"PingActor", std::move(raw_connection),
|
false, create_ping_actor("", std::move(raw_connection), nullptr,
|
||||||
PromiseCreator::lambda([promise = std::move(promise)](
|
PromiseCreator::lambda([promise = std::move(promise)](
|
||||||
Result<unique_ptr<mtproto::RawConnection>> result) mutable {
|
Result<unique_ptr<mtproto::RawConnection>> result) mutable {
|
||||||
if (result.is_error()) {
|
if (result.is_error()) {
|
||||||
return promise.set_error(Status::Error(400, result.error().message()));
|
return promise.set_error(Status::Error(400, result.error().message()));
|
||||||
}
|
}
|
||||||
auto ping_time = result.ok()->rtt_;
|
auto ping_time = result.ok()->rtt_;
|
||||||
promise.set_value(std::move(ping_time));
|
promise.set_value(std::move(ping_time));
|
||||||
}),
|
}),
|
||||||
create_reference(token))};
|
create_reference(token))};
|
||||||
}
|
}
|
||||||
|
|
||||||
void ConnectionCreator::set_active_proxy_id(int32 proxy_id, bool from_binlog) {
|
void ConnectionCreator::set_active_proxy_id(int32 proxy_id, bool from_binlog) {
|
||||||
|
@ -805,7 +730,8 @@ void ConnectionCreator::on_mtproto_error(size_t hash) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ConnectionCreator::request_raw_connection(DcId dc_id, bool allow_media_only, bool is_media,
|
void ConnectionCreator::request_raw_connection(DcId dc_id, bool allow_media_only, bool is_media,
|
||||||
Promise<unique_ptr<mtproto::RawConnection>> promise, size_t hash) {
|
Promise<unique_ptr<mtproto::RawConnection>> promise, size_t hash,
|
||||||
|
unique_ptr<mtproto::AuthData> auth_data) {
|
||||||
auto &client = clients_[hash];
|
auto &client = clients_[hash];
|
||||||
if (!client.inited) {
|
if (!client.inited) {
|
||||||
client.inited = true;
|
client.inited = true;
|
||||||
|
@ -813,6 +739,7 @@ void ConnectionCreator::request_raw_connection(DcId dc_id, bool allow_media_only
|
||||||
client.dc_id = dc_id;
|
client.dc_id = dc_id;
|
||||||
client.allow_media_only = allow_media_only;
|
client.allow_media_only = allow_media_only;
|
||||||
client.is_media = is_media;
|
client.is_media = is_media;
|
||||||
|
client.auth_data = std::move(auth_data);
|
||||||
} else {
|
} else {
|
||||||
CHECK(client.hash == hash);
|
CHECK(client.hash == hash);
|
||||||
CHECK(client.dc_id == dc_id);
|
CHECK(client.dc_id == dc_id);
|
||||||
|
@ -1111,9 +1038,8 @@ void ConnectionCreator::client_create_raw_connection(Result<ConnectionData> r_co
|
||||||
if (check_mode) {
|
if (check_mode) {
|
||||||
VLOG(connections) << "Start check: " << debug_str;
|
VLOG(connections) << "Start check: " << debug_str;
|
||||||
auto token = next_token();
|
auto token = next_token();
|
||||||
children_[token] = {
|
children_[token] = {true, create_ping_actor(debug_str, std::move(raw_connection), nullptr, std::move(promise),
|
||||||
true, create_actor<detail::PingActor>(PSLICE() << "PingActor<" << debug_str << ">", std::move(raw_connection),
|
create_reference(token))};
|
||||||
std::move(promise), create_reference(token))};
|
|
||||||
} else {
|
} else {
|
||||||
promise.set_value(std::move(raw_connection));
|
promise.set_value(std::move(raw_connection));
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
#include "td/telegram/net/NetQuery.h"
|
#include "td/telegram/net/NetQuery.h"
|
||||||
#include "td/telegram/StateManager.h"
|
#include "td/telegram/StateManager.h"
|
||||||
|
|
||||||
|
#include "td/mtproto/AuthData.h"
|
||||||
#include "td/mtproto/TransportType.h"
|
#include "td/mtproto/TransportType.h"
|
||||||
|
|
||||||
#include "td/actor/actor.h"
|
#include "td/actor/actor.h"
|
||||||
|
@ -156,7 +157,8 @@ class ConnectionCreator : public NetQueryCallback {
|
||||||
void on_pong(size_t hash);
|
void on_pong(size_t hash);
|
||||||
void on_mtproto_error(size_t hash);
|
void on_mtproto_error(size_t hash);
|
||||||
void request_raw_connection(DcId dc_id, bool allow_media_only, bool is_media,
|
void request_raw_connection(DcId dc_id, bool allow_media_only, bool is_media,
|
||||||
Promise<unique_ptr<mtproto::RawConnection>> promise, size_t hash = 0);
|
Promise<unique_ptr<mtproto::RawConnection>> promise, size_t hash = 0,
|
||||||
|
unique_ptr<mtproto::AuthData> auth_data = {});
|
||||||
void request_raw_connection_by_ip(IPAddress ip_address, Promise<unique_ptr<mtproto::RawConnection>> promise);
|
void request_raw_connection_by_ip(IPAddress ip_address, Promise<unique_ptr<mtproto::RawConnection>> promise);
|
||||||
|
|
||||||
void set_net_stats_callback(std::shared_ptr<NetStatsCallback> common_callback,
|
void set_net_stats_callback(std::shared_ptr<NetStatsCallback> common_callback,
|
||||||
|
@ -237,6 +239,7 @@ class ConnectionCreator : public NetQueryCallback {
|
||||||
DcId dc_id;
|
DcId dc_id;
|
||||||
bool allow_media_only;
|
bool allow_media_only;
|
||||||
bool is_media;
|
bool is_media;
|
||||||
|
unique_ptr<mtproto::AuthData> auth_data;
|
||||||
};
|
};
|
||||||
std::map<size_t, ClientInfo> clients_;
|
std::map<size_t, ClientInfo> clients_;
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ class SessionCallback : public Session::Callback {
|
||||||
}
|
}
|
||||||
void request_raw_connection(Promise<unique_ptr<mtproto::RawConnection>> promise) override {
|
void request_raw_connection(Promise<unique_ptr<mtproto::RawConnection>> promise) override {
|
||||||
send_closure(G()->connection_creator(), &ConnectionCreator::request_raw_connection, dc_id_, allow_media_only_,
|
send_closure(G()->connection_creator(), &ConnectionCreator::request_raw_connection, dc_id_, allow_media_only_,
|
||||||
is_media_, std::move(promise), hash_);
|
is_media_, std::move(promise), hash_, nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_tmp_auth_key_updated(mtproto::AuthKey auth_key) override {
|
void on_tmp_auth_key_updated(mtproto::AuthKey auth_key) override {
|
||||||
|
|
124
test/mtproto.cpp
124
test/mtproto.cpp
|
@ -14,6 +14,7 @@
|
||||||
#include "td/mtproto/Handshake.h"
|
#include "td/mtproto/Handshake.h"
|
||||||
#include "td/mtproto/HandshakeActor.h"
|
#include "td/mtproto/HandshakeActor.h"
|
||||||
#include "td/mtproto/PingConnection.h"
|
#include "td/mtproto/PingConnection.h"
|
||||||
|
#include "td/mtproto/Ping.h"
|
||||||
#include "td/mtproto/RawConnection.h"
|
#include "td/mtproto/RawConnection.h"
|
||||||
#include "td/mtproto/TransportType.h"
|
#include "td/mtproto/TransportType.h"
|
||||||
|
|
||||||
|
@ -22,6 +23,7 @@
|
||||||
#include "td/net/TransparentProxy.h"
|
#include "td/net/TransparentProxy.h"
|
||||||
|
|
||||||
#include "td/telegram/ConfigManager.h"
|
#include "td/telegram/ConfigManager.h"
|
||||||
|
#include "td/telegram/net/Session.h"
|
||||||
#include "td/telegram/net/DcId.h"
|
#include "td/telegram/net/DcId.h"
|
||||||
#include "td/telegram/net/PublicRsaKeyShared.h"
|
#include "td/telegram/net/PublicRsaKeyShared.h"
|
||||||
#include "td/telegram/NotificationManager.h"
|
#include "td/telegram/NotificationManager.h"
|
||||||
|
@ -170,7 +172,7 @@ class TestPingActor : public Actor {
|
||||||
Status *result_;
|
Status *result_;
|
||||||
|
|
||||||
void start_up() override {
|
void start_up() override {
|
||||||
ping_connection_ = make_unique<mtproto::PingConnection>(
|
ping_connection_ = mtproto::PingConnection::create_req_pq(
|
||||||
make_unique<mtproto::RawConnection>(SocketFd::open(ip_address_).move_as_ok(),
|
make_unique<mtproto::RawConnection>(SocketFd::open(ip_address_).move_as_ok(),
|
||||||
mtproto::TransportType{mtproto::TransportType::Tcp, 0, ""}, nullptr),
|
mtproto::TransportType{mtproto::TransportType::Tcp, 0, ""}, nullptr),
|
||||||
3);
|
3);
|
||||||
|
@ -181,7 +183,6 @@ class TestPingActor : public Actor {
|
||||||
}
|
}
|
||||||
void tear_down() override {
|
void tear_down() override {
|
||||||
Scheduler::unsubscribe_before_close(ping_connection_->get_poll_info().get_pollable_fd_ref());
|
Scheduler::unsubscribe_before_close(ping_connection_->get_poll_info().get_pollable_fd_ref());
|
||||||
ping_connection_->close();
|
|
||||||
Scheduler::instance()->finish();
|
Scheduler::instance()->finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -463,3 +464,122 @@ TEST(Mtproto, notifications) {
|
||||||
ASSERT_EQ(decrypted_payload, NotificationManager::decrypt_push(key_id, key, push).ok());
|
ASSERT_EQ(decrypted_payload, NotificationManager::decrypt_push(key_id, key, push).ok());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class FastPingTestActor : public Actor {
|
||||||
|
public:
|
||||||
|
explicit FastPingTestActor(Status *result) : result_(result) {
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Status *result_;
|
||||||
|
unique_ptr<mtproto::RawConnection> connection_;
|
||||||
|
unique_ptr<mtproto::AuthKeyHandshake> handshake_;
|
||||||
|
ActorOwn<> fast_ping_;
|
||||||
|
int iteration_{0};
|
||||||
|
|
||||||
|
void start_up() override {
|
||||||
|
// Run handshake to create key and salt
|
||||||
|
auto raw_connection =
|
||||||
|
make_unique<mtproto::RawConnection>(SocketFd::open(get_default_ip_address()).move_as_ok(),
|
||||||
|
mtproto::TransportType{mtproto::TransportType::Tcp, 0, ""}, nullptr);
|
||||||
|
auto handshake = make_unique<mtproto::AuthKeyHandshake>(get_default_dc_id(), 60 * 100 /*temp*/);
|
||||||
|
create_actor<mtproto::HandshakeActor>(
|
||||||
|
"HandshakeActor", std::move(handshake), std::move(raw_connection), make_unique<HandshakeContext>(), 10.0,
|
||||||
|
PromiseCreator::lambda([self = actor_id(this)](Result<unique_ptr<mtproto::RawConnection>> raw_connection) {
|
||||||
|
send_closure(self, &FastPingTestActor::got_connection, std::move(raw_connection), 1);
|
||||||
|
}),
|
||||||
|
PromiseCreator::lambda([self = actor_id(this)](Result<unique_ptr<mtproto::AuthKeyHandshake>> handshake) {
|
||||||
|
send_closure(self, &FastPingTestActor::got_handshake, std::move(handshake), 1);
|
||||||
|
}))
|
||||||
|
.release();
|
||||||
|
}
|
||||||
|
void got_connection(Result<unique_ptr<mtproto::RawConnection>> r_raw_connection, int32 dummy) {
|
||||||
|
if (r_raw_connection.is_error()) {
|
||||||
|
*result_ = r_raw_connection.move_as_error();
|
||||||
|
return stop();
|
||||||
|
}
|
||||||
|
connection_ = r_raw_connection.move_as_ok();
|
||||||
|
loop();
|
||||||
|
}
|
||||||
|
|
||||||
|
void got_handshake(Result<unique_ptr<mtproto::AuthKeyHandshake>> r_handshake, int32 dummy) {
|
||||||
|
if (r_handshake.is_error()) {
|
||||||
|
*result_ = r_handshake.move_as_error();
|
||||||
|
return stop();
|
||||||
|
}
|
||||||
|
handshake_ = r_handshake.move_as_ok();
|
||||||
|
loop();
|
||||||
|
}
|
||||||
|
|
||||||
|
void got_raw_connection(Result<unique_ptr<mtproto::RawConnection>> r_connection) {
|
||||||
|
if (r_connection.is_error()) {
|
||||||
|
Scheduler::instance()->finish();
|
||||||
|
*result_ = r_connection.move_as_error();
|
||||||
|
return stop();
|
||||||
|
}
|
||||||
|
connection_ = r_connection.move_as_ok();
|
||||||
|
LOG(ERROR) << "RTT: " << connection_->rtt_;
|
||||||
|
connection_->rtt_ = 0;
|
||||||
|
loop();
|
||||||
|
}
|
||||||
|
|
||||||
|
void loop() override {
|
||||||
|
if (handshake_ && connection_) {
|
||||||
|
LOG(ERROR) << iteration_;
|
||||||
|
if (iteration_ == 6) {
|
||||||
|
Scheduler::instance()->finish();
|
||||||
|
return stop();
|
||||||
|
}
|
||||||
|
unique_ptr<mtproto::AuthData> auth_data;
|
||||||
|
if (iteration_ % 2 == 0) {
|
||||||
|
auth_data = make_unique<mtproto::AuthData>();
|
||||||
|
auth_data->set_tmp_auth_key(handshake_->auth_key);
|
||||||
|
auth_data->set_server_time_difference(handshake_->server_time_diff);
|
||||||
|
auth_data->set_server_salt(handshake_->server_salt, Time::now());
|
||||||
|
auth_data->set_future_salts({mtproto::ServerSalt{0u, 1e20, 1e30}}, Time::now());
|
||||||
|
auth_data->set_use_pfs(true);
|
||||||
|
uint64 session_id = 0;
|
||||||
|
do {
|
||||||
|
Random::secure_bytes(reinterpret_cast<uint8 *>(&session_id), sizeof(session_id));
|
||||||
|
} while (session_id == 0);
|
||||||
|
auth_data->set_session_id(session_id);
|
||||||
|
}
|
||||||
|
iteration_++;
|
||||||
|
fast_ping_ = create_ping_actor(
|
||||||
|
"", std::move(connection_), std::move(auth_data),
|
||||||
|
PromiseCreator::lambda([self = actor_id(this)](Result<unique_ptr<mtproto::RawConnection>> r_raw_connection) {
|
||||||
|
send_closure(self, &FastPingTestActor::got_raw_connection, std::move(r_raw_connection));
|
||||||
|
}),
|
||||||
|
ActorShared<>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class Mtproto_FastPing : public Test {
|
||||||
|
public:
|
||||||
|
using Test::Test;
|
||||||
|
bool step() final {
|
||||||
|
if (!is_inited_) {
|
||||||
|
sched_.init(0);
|
||||||
|
sched_.create_actor_unsafe<FastPingTestActor>(0, "FastPingTestActor", &result_).release();
|
||||||
|
sched_.start();
|
||||||
|
is_inited_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ret = sched_.run_main(10);
|
||||||
|
if (ret) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
sched_.finish();
|
||||||
|
if (result_.is_error()) {
|
||||||
|
LOG(ERROR) << result_;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool is_inited_ = false;
|
||||||
|
ConcurrentScheduler sched_;
|
||||||
|
Status result_;
|
||||||
|
};
|
||||||
|
RegisterTest<Mtproto_FastPing> mtproto_fastping("Mtproto_FastPing");
|
||||||
|
|
Loading…
Reference in New Issue
Block a user