Add mtproto::ConnectionManager.

This commit is contained in:
levlam 2021-09-16 19:09:39 +03:00
parent 7d26a30bd0
commit 74315e2e60
11 changed files with 134 additions and 82 deletions

View File

@ -258,6 +258,7 @@ set(TL_DOTNET_SCHEME_SOURCE
set(TDLIB_SOURCE
td/mtproto/AuthData.cpp
td/mtproto/ConnectionManager.cpp
td/mtproto/DhHandshake.cpp
td/mtproto/Handshake.cpp
td/mtproto/HandshakeActor.cpp
@ -422,6 +423,7 @@ set(TDLIB_SOURCE
td/mtproto/AuthData.h
td/mtproto/AuthKey.h
td/mtproto/ConnectionManager.h
td/mtproto/CryptoStorer.h
td/mtproto/DhCallback.h
td/mtproto/DhHandshake.h

View File

@ -0,0 +1,37 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/mtproto/ConnectionManager.h"
namespace td {
namespace mtproto {
void ConnectionManager::inc_connect() {
auto &cnt = get_link_token() == 1 ? connect_cnt_ : connect_proxy_cnt_;
cnt++;
if (cnt == 1) {
loop();
}
}
void ConnectionManager::dec_connect() {
auto &cnt = get_link_token() == 1 ? connect_cnt_ : connect_proxy_cnt_;
CHECK(cnt > 0);
cnt--;
if (cnt == 0) {
loop();
}
}
ConnectionManager::ConnectionToken ConnectionManager::connection_impl(ActorId<ConnectionManager> connection_manager,
int mode) {
auto actor = ActorShared<ConnectionManager>(connection_manager, mode);
send_closure(actor, &ConnectionManager::inc_connect);
return ConnectionToken(std::move(actor));
}
} // namespace mtproto
} // namespace td

View File

@ -0,0 +1,70 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/actor/actor.h"
#include "td/utils/common.h"
namespace td {
namespace mtproto {
class ConnectionManager : public Actor {
public:
class ConnectionToken {
public:
ConnectionToken() = default;
explicit ConnectionToken(ActorShared<ConnectionManager> connection_manager)
: connection_manager_(std::move(connection_manager)) {
}
ConnectionToken(const ConnectionToken &) = delete;
ConnectionToken &operator=(const ConnectionToken &) = delete;
ConnectionToken(ConnectionToken &&) = default;
ConnectionToken &operator=(ConnectionToken &&other) {
reset();
connection_manager_ = std::move(other.connection_manager_);
return *this;
}
~ConnectionToken() {
reset();
}
void reset() {
if (!connection_manager_.empty()) {
send_closure(connection_manager_, &ConnectionManager::dec_connect);
connection_manager_.reset();
}
}
bool empty() const {
return connection_manager_.empty();
}
private:
ActorShared<ConnectionManager> connection_manager_;
};
static ConnectionToken connection(ActorId<ConnectionManager> connection_manager) {
return connection_impl(connection_manager, 1);
}
static ConnectionToken connection_proxy(ActorId<ConnectionManager> connection_manager) {
return connection_impl(connection_manager, 2);
}
protected:
uint32 connect_cnt_ = 0;
uint32 connect_proxy_cnt_ = 0;
private:
void inc_connect();
void dec_connect();
static ConnectionToken connection_impl(ActorId<ConnectionManager> connection_manager, int mode);
};
} // namespace mtproto
} // namespace td

View File

@ -42,7 +42,7 @@ class RawConnectionDefault final : public RawConnection {
transport_->init(&socket_fd_.input_buffer(), &socket_fd_.output_buffer());
}
void set_connection_token(StateManager::ConnectionToken connection_token) final {
void set_connection_token(ConnectionManager::ConnectionToken connection_token) final {
connection_token_ = std::move(connection_token);
}
@ -135,7 +135,7 @@ class RawConnectionDefault final : public RawConnection {
unique_ptr<StatsCallback> stats_callback_;
StateManager::ConnectionToken connection_token_;
ConnectionManager::ConnectionToken connection_token_;
Status flush_read(const AuthKey &auth_key, Callback &callback) {
auto r = socket_fd_.flush_read();
@ -268,7 +268,7 @@ class RawConnectionHttp final : public RawConnection {
answers_->init();
}
void set_connection_token(StateManager::ConnectionToken connection_token) final {
void set_connection_token(ConnectionManager::ConnectionToken connection_token) final {
connection_token_ = std::move(connection_token);
}
@ -348,7 +348,7 @@ class RawConnectionHttp final : public RawConnection {
unique_ptr<StatsCallback> stats_callback_;
StateManager::ConnectionToken connection_token_;
ConnectionManager::ConnectionToken connection_token_;
std::shared_ptr<MpscPollableQueue<Result<BufferSlice>>> answers_;
std::vector<BufferSlice> to_send_;

View File

@ -6,8 +6,7 @@
//
#pragma once
#include "td/telegram/StateManager.h"
#include "td/mtproto/ConnectionManager.h"
#include "td/mtproto/PacketInfo.h"
#include "td/mtproto/TransportType.h"
@ -44,7 +43,7 @@ class RawConnection {
static unique_ptr<RawConnection> create(IPAddress ip_address, SocketFd socket_fd, TransportType transport_type,
unique_ptr<StatsCallback> stats_callback);
virtual void set_connection_token(StateManager::ConnectionToken connection_token) = 0;
virtual void set_connection_token(ConnectionManager::ConnectionToken connection_token) = 0;
virtual bool can_send() const = 0;
virtual TransportType get_transport_type() const = 0;

View File

@ -15,22 +15,6 @@
namespace td {
void StateManager::inc_connect() {
auto &cnt = get_link_token() == 1 ? connect_cnt_ : connect_proxy_cnt_;
cnt++;
if (cnt == 1) {
loop();
}
}
void StateManager::dec_connect() {
auto &cnt = get_link_token() == 1 ? connect_cnt_ : connect_proxy_cnt_;
CHECK(cnt > 0);
cnt--;
if (cnt == 0) {
loop();
}
}
void StateManager::on_synchronized(bool is_synchronized) {
if (sync_flag_ != is_synchronized) {
sync_flag_ = is_synchronized;

View File

@ -6,17 +6,19 @@
//
#pragma once
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
#include "td/telegram/ConnectionState.h"
#include "td/telegram/net/NetType.h"
#include "td/mtproto/ConnectionManager.h"
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/common.h"
namespace td {
class StateManager final : public Actor {
class StateManager final : public mtproto::ConnectionManager {
public:
class Callback {
public:
@ -59,49 +61,8 @@ class StateManager final : public Actor {
void close();
class ConnectionToken {
public:
ConnectionToken() = default;
explicit ConnectionToken(ActorShared<StateManager> state_manager) : state_manager_(std::move(state_manager)) {
}
ConnectionToken(const ConnectionToken &) = delete;
ConnectionToken &operator=(const ConnectionToken &) = delete;
ConnectionToken(ConnectionToken &&) = default;
ConnectionToken &operator=(ConnectionToken &&other) {
reset();
state_manager_ = std::move(other.state_manager_);
return *this;
}
~ConnectionToken() {
reset();
}
void reset() {
if (!state_manager_.empty()) {
send_closure(state_manager_, &StateManager::dec_connect);
state_manager_.reset();
}
}
bool empty() const {
return state_manager_.empty();
}
private:
ActorShared<StateManager> state_manager_;
};
static ConnectionToken connection(ActorId<StateManager> state_manager) {
return connection_impl(state_manager, 1);
}
static ConnectionToken connection_proxy(ActorId<StateManager> state_manager) {
return connection_impl(state_manager, 2);
}
private:
ActorShared<> parent_;
uint32 connect_cnt_ = 0;
uint32 connect_proxy_cnt_ = 0;
bool sync_flag_ = true;
bool network_flag_ = true;
NetType network_type_ = NetType::Unknown;
@ -136,12 +97,6 @@ class StateManager final : public Actor {
void do_on_network(NetType new_network_type, bool inc_generation);
ConnectionState get_real_state() const;
static ConnectionToken connection_impl(ActorId<StateManager> state_manager, int mode) {
auto actor = ActorShared<StateManager>(state_manager, mode);
send_closure(actor, &StateManager::inc_connect);
return ConnectionToken(std::move(actor));
}
};
} // namespace td

View File

@ -761,7 +761,7 @@ ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd
void set_result(Result<SocketFd> result) final {
if (result.is_error()) {
if (use_connection_token_) {
connection_token_ = StateManager::ConnectionToken();
connection_token_ = mtproto::ConnectionManager::ConnectionToken();
}
if (was_connected_ && stats_callback_) {
stats_callback_->on_error();
@ -778,14 +778,15 @@ ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd
}
void on_connected() final {
if (use_connection_token_) {
connection_token_ = StateManager::connection_proxy(G()->state_manager());
connection_token_ = mtproto::ConnectionManager::connection_proxy(
static_cast<ActorId<mtproto::ConnectionManager>>(G()->state_manager()));
}
was_connected_ = true;
}
private:
Promise<ConnectionData> promise_;
StateManager::ConnectionToken connection_token_;
mtproto::ConnectionManager::ConnectionToken connection_token_;
IPAddress ip_address_;
unique_ptr<mtproto::RawConnection::StatsCallback> stats_callback_;
bool use_connection_token_;

View File

@ -13,9 +13,9 @@
#include "td/telegram/net/DcOptionsSet.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/net/Proxy.h"
#include "td/telegram/StateManager.h"
#include "td/mtproto/AuthData.h"
#include "td/mtproto/ConnectionManager.h"
#include "td/mtproto/RawConnection.h"
#include "td/mtproto/TransportType.h"
@ -82,7 +82,7 @@ class ConnectionCreator final : public NetQueryCallback {
struct ConnectionData {
IPAddress ip_address;
SocketFd socket_fd;
StateManager::ConnectionToken connection_token;
mtproto::ConnectionManager::ConnectionToken connection_token;
unique_ptr<mtproto::RawConnection::StatsCallback> stats_callback;
};

View File

@ -435,9 +435,11 @@ void Session::raw_event(const Event::Raw &event) {
/** Connection::Callback **/
void Session::on_connected() {
if (is_main_) {
connection_token_ = StateManager::connection(G()->state_manager());
connection_token_ =
mtproto::ConnectionManager::connection(static_cast<ActorId<mtproto::ConnectionManager>>(G()->state_manager()));
}
}
Status Session::on_pong() {
constexpr int MAX_QUERY_TIMEOUT = 60;
constexpr int MIN_CONNECTION_ACTIVE = 60;
@ -468,9 +470,11 @@ Status Session::on_pong() {
}
return Status::OK();
}
void Session::on_auth_key_updated() {
shared_auth_data_->set_auth_key(auth_data_.get_main_auth_key());
}
void Session::on_tmp_auth_key_updated() {
callback_->on_tmp_auth_key_updated(auth_data_.get_tmp_auth_key());
}

View File

@ -9,10 +9,10 @@
#include "td/telegram/net/AuthDataShared.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/net/TempAuthKeyWatchdog.h"
#include "td/telegram/StateManager.h"
#include "td/mtproto/AuthData.h"
#include "td/mtproto/AuthKey.h"
#include "td/mtproto/ConnectionManager.h"
#include "td/mtproto/Handshake.h"
#include "td/mtproto/SessionConnection.h"
@ -153,7 +153,7 @@ class Session final
ConnectionInfo *current_info_;
ConnectionInfo main_connection_;
ConnectionInfo long_poll_connection_;
StateManager::ConnectionToken connection_token_;
mtproto::ConnectionManager::ConnectionToken connection_token_;
double cached_connection_timestamp_ = 0;
unique_ptr<mtproto::RawConnection> cached_connection_;