c65b998cd6
GitOrigin-RevId: 4a3c6283026ba15836cb7fa011c4b3ad21a64baa
853 lines
29 KiB
C++
853 lines
29 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#include "td/telegram/net/ConnectionCreator.h"
|
|
|
|
#include "td/telegram/telegram_api.h"
|
|
|
|
#include "td/telegram/ConfigManager.h"
|
|
#include "td/telegram/Global.h"
|
|
#include "td/telegram/logevent/LogEvent.h"
|
|
#include "td/telegram/MessagesManager.h"
|
|
#include "td/telegram/net/MtprotoHeader.h"
|
|
#include "td/telegram/net/NetQueryDispatcher.h"
|
|
#include "td/telegram/StateManager.h"
|
|
|
|
#include "td/mtproto/IStreamTransport.h"
|
|
#include "td/mtproto/PingConnection.h"
|
|
#include "td/mtproto/RawConnection.h"
|
|
|
|
#include "td/net/GetHostByNameActor.h"
|
|
#include "td/net/Socks5.h"
|
|
|
|
#include "td/utils/format.h"
|
|
#include "td/utils/logging.h"
|
|
#include "td/utils/misc.h"
|
|
#include "td/utils/port/IPAddress.h"
|
|
#include "td/utils/ScopeGuard.h"
|
|
#include "td/utils/Time.h"
|
|
#include "td/utils/tl_helpers.h"
|
|
|
|
#include <algorithm>
|
|
|
|
namespace td {
|
|
|
|
namespace detail {
|
|
|
|
class StatsCallback final : public mtproto::RawConnection::StatsCallback {
|
|
public:
|
|
StatsCallback(std::shared_ptr<NetStatsCallback> net_stats_callback, ActorId<ConnectionCreator> connection_creator,
|
|
size_t hash, DcOptionsSet::Stat *option_stat)
|
|
: net_stats_callback_(std::move(net_stats_callback))
|
|
, connection_creator_(std::move(connection_creator))
|
|
, hash_(hash)
|
|
, option_stat_(option_stat) {
|
|
}
|
|
|
|
void on_read(uint64 bytes) final {
|
|
net_stats_callback_->on_read(bytes);
|
|
}
|
|
void on_write(uint64 bytes) final {
|
|
net_stats_callback_->on_write(bytes);
|
|
}
|
|
|
|
void on_pong() final {
|
|
if (option_stat_) {
|
|
send_lambda(connection_creator_, [stat = option_stat_] { stat->on_ok(); });
|
|
}
|
|
}
|
|
void on_error() final {
|
|
if (option_stat_) {
|
|
send_lambda(connection_creator_, [stat = option_stat_] { stat->on_error(); });
|
|
}
|
|
}
|
|
|
|
void on_mtproto_error() final {
|
|
send_closure(connection_creator_, &ConnectionCreator::on_mtproto_error, hash_);
|
|
}
|
|
|
|
private:
|
|
std::shared_ptr<NetStatsCallback> net_stats_callback_;
|
|
ActorId<ConnectionCreator> connection_creator_;
|
|
size_t hash_;
|
|
DcOptionsSet::Stat *option_stat_;
|
|
};
|
|
|
|
class PingActor : public Actor {
|
|
public:
|
|
PingActor(std::unique_ptr<mtproto::RawConnection> raw_connection,
|
|
Promise<std::unique_ptr<mtproto::RawConnection>> promise, ActorShared<> parent)
|
|
: promise_(std::move(promise)), parent_(std::move(parent)) {
|
|
ping_connection_ = std::make_unique<mtproto::PingConnection>(std::move(raw_connection));
|
|
}
|
|
|
|
private:
|
|
std::unique_ptr<mtproto::PingConnection> ping_connection_;
|
|
Promise<std::unique_ptr<mtproto::RawConnection>> promise_;
|
|
ActorShared<> parent_;
|
|
double start_at_;
|
|
|
|
void start_up() override {
|
|
ping_connection_->get_pollable().set_observer(this);
|
|
subscribe(ping_connection_->get_pollable());
|
|
start_at_ = Time::now();
|
|
set_timeout_in(10);
|
|
yield();
|
|
}
|
|
void tear_down() override {
|
|
finish(Status::OK());
|
|
}
|
|
|
|
void loop() override {
|
|
auto status = ping_connection_->flush();
|
|
if (status.is_error()) {
|
|
finish(std::move(status));
|
|
return stop();
|
|
}
|
|
if (ping_connection_->was_pong()) {
|
|
finish(Status::OK());
|
|
return stop();
|
|
}
|
|
}
|
|
|
|
void timeout_expired() override {
|
|
finish(Status::Error("Pong timeout expired"));
|
|
stop();
|
|
}
|
|
|
|
void finish(Status status) {
|
|
auto raw_connection = ping_connection_->move_as_raw_connection();
|
|
if (!raw_connection) {
|
|
CHECK(!promise_);
|
|
return;
|
|
}
|
|
unsubscribe(raw_connection->get_pollable());
|
|
raw_connection->get_pollable().set_observer(nullptr);
|
|
if (promise_) {
|
|
if (status.is_error()) {
|
|
if (raw_connection->stats_callback()) {
|
|
raw_connection->stats_callback()->on_error();
|
|
}
|
|
raw_connection->close();
|
|
promise_.set_error(std::move(status));
|
|
} else {
|
|
raw_connection->rtt_ = Time::now() - start_at_;
|
|
if (raw_connection->stats_callback()) {
|
|
raw_connection->stats_callback()->on_pong();
|
|
}
|
|
promise_.set_value(std::move(raw_connection));
|
|
}
|
|
} else {
|
|
if (raw_connection->stats_callback()) {
|
|
raw_connection->stats_callback()->on_error();
|
|
}
|
|
raw_connection->close();
|
|
}
|
|
}
|
|
};
|
|
|
|
} // namespace detail
|
|
|
|
template <class T>
|
|
void Proxy::parse(T &parser) {
|
|
using td::parse;
|
|
parse(type_, parser);
|
|
if (type_ == Proxy::Type::Socks5) {
|
|
parse(server_, parser);
|
|
parse(port_, parser);
|
|
parse(user_, parser);
|
|
parse(password_, parser);
|
|
} else if (type_ == Proxy::Type::Mtproto) {
|
|
parse(server_, parser);
|
|
parse(port_, parser);
|
|
parse(secret_, parser);
|
|
} else {
|
|
CHECK(type_ == Proxy::Type::None);
|
|
}
|
|
}
|
|
|
|
template <class T>
|
|
void Proxy::store(T &storer) const {
|
|
using td::store;
|
|
store(type_, storer);
|
|
if (type_ == Proxy::Type::Socks5) {
|
|
store(server_, storer);
|
|
store(port_, storer);
|
|
store(user_, storer);
|
|
store(password_, storer);
|
|
} else if (type_ == Proxy::Type::Mtproto) {
|
|
store(server_, storer);
|
|
store(port_, storer);
|
|
store(secret_, storer);
|
|
} else {
|
|
CHECK(type_ == Proxy::Type::None);
|
|
}
|
|
}
|
|
|
|
ConnectionCreator::ClientInfo::ClientInfo() {
|
|
flood_control.add_limit(1, 1);
|
|
flood_control.add_limit(4, 2);
|
|
flood_control.add_limit(8, 3);
|
|
|
|
flood_control_online.add_limit(1, 4);
|
|
flood_control_online.add_limit(5, 5);
|
|
|
|
mtproto_error_flood_control.add_limit(1, 1);
|
|
mtproto_error_flood_control.add_limit(4, 2);
|
|
mtproto_error_flood_control.add_limit(8, 3);
|
|
}
|
|
|
|
ConnectionCreator::ConnectionCreator(ActorShared<> parent) : parent_(std::move(parent)) {
|
|
}
|
|
|
|
ConnectionCreator::ConnectionCreator(ConnectionCreator &&other) = default;
|
|
|
|
ConnectionCreator &ConnectionCreator::operator=(ConnectionCreator &&other) = default;
|
|
|
|
ConnectionCreator::~ConnectionCreator() = default;
|
|
|
|
void ConnectionCreator::set_net_stats_callback(std::shared_ptr<NetStatsCallback> common_callback,
|
|
std::shared_ptr<NetStatsCallback> media_callback) {
|
|
common_net_stats_callback_ = std::move(common_callback);
|
|
media_net_stats_callback_ = std::move(media_callback);
|
|
}
|
|
|
|
void ConnectionCreator::set_proxy(Proxy proxy) {
|
|
set_proxy_impl(std::move(proxy), false);
|
|
loop();
|
|
}
|
|
|
|
void ConnectionCreator::set_proxy_impl(Proxy proxy, bool from_db) {
|
|
auto have_proxy = proxy.type() != Proxy::Type::None;
|
|
if (proxy_ == proxy) {
|
|
if (!have_proxy) {
|
|
on_get_proxy_info(make_tl_object<telegram_api::help_proxyDataEmpty>(0));
|
|
}
|
|
return;
|
|
}
|
|
|
|
proxy_ = std::move(proxy);
|
|
G()->mtproto_header().set_proxy(proxy_);
|
|
send_closure(G()->state_manager(), &StateManager::on_proxy, have_proxy);
|
|
|
|
if (!from_db) {
|
|
if (have_proxy) {
|
|
G()->td_db()->get_binlog_pmc()->set("proxy", log_event_store(proxy_).as_slice().str());
|
|
} else {
|
|
G()->td_db()->get_binlog_pmc()->erase("proxy");
|
|
}
|
|
for (auto &child : children_) {
|
|
child.second.reset();
|
|
}
|
|
}
|
|
|
|
resolve_proxy_query_token_ = 0;
|
|
resolve_proxy_timestamp_ = Timestamp();
|
|
proxy_ip_address_ = IPAddress();
|
|
|
|
get_proxy_info_query_token_ = 0;
|
|
get_proxy_info_timestamp_ = Timestamp();
|
|
if (!have_proxy || !from_db) {
|
|
on_get_proxy_info(make_tl_object<telegram_api::help_proxyDataEmpty>(0));
|
|
} else {
|
|
schedule_get_proxy_info(0);
|
|
}
|
|
}
|
|
|
|
void ConnectionCreator::get_proxy(Promise<Proxy> promise) {
|
|
promise.set_value(Proxy(proxy_));
|
|
}
|
|
|
|
void ConnectionCreator::on_network(bool network_flag, uint32 network_generation) {
|
|
network_flag_ = network_flag;
|
|
auto old_generation = network_generation_;
|
|
network_generation_ = network_generation;
|
|
if (network_flag_) {
|
|
resolve_proxy_query_token_ = 0;
|
|
resolve_proxy_timestamp_ = Timestamp();
|
|
get_proxy_info_timestamp_ = Timestamp();
|
|
for (auto &client : clients_) {
|
|
client.second.backoff.clear();
|
|
client.second.flood_control.clear_events();
|
|
client.second.flood_control_online.clear_events();
|
|
client_loop(client.second);
|
|
}
|
|
|
|
if (old_generation != network_generation_) {
|
|
loop();
|
|
}
|
|
}
|
|
}
|
|
|
|
void ConnectionCreator::on_online(bool online_flag) {
|
|
online_flag_ = online_flag;
|
|
if (online_flag_) {
|
|
for (auto &client : clients_) {
|
|
client.second.backoff.clear();
|
|
client.second.flood_control_online.clear_events();
|
|
client_loop(client.second);
|
|
}
|
|
}
|
|
}
|
|
|
|
void ConnectionCreator::on_mtproto_error(size_t hash) {
|
|
auto &client = clients_[hash];
|
|
client.hash = hash;
|
|
client.mtproto_error_flood_control.add_event(static_cast<int32>(Time::now_cached()));
|
|
}
|
|
|
|
void ConnectionCreator::request_raw_connection(DcId dc_id, bool allow_media_only, bool is_media,
|
|
Promise<std::unique_ptr<mtproto::RawConnection>> promise, size_t hash) {
|
|
auto &client = clients_[hash];
|
|
if (!client.inited) {
|
|
client.inited = true;
|
|
client.hash = hash;
|
|
client.dc_id = dc_id;
|
|
client.allow_media_only = allow_media_only;
|
|
client.is_media = is_media;
|
|
} else {
|
|
CHECK(client.hash == hash);
|
|
CHECK(client.dc_id == dc_id);
|
|
CHECK(client.allow_media_only == allow_media_only);
|
|
CHECK(client.is_media == is_media);
|
|
}
|
|
VLOG(connections) << tag("client", format::as_hex(client.hash)) << " " << dc_id << " "
|
|
<< tag("allow_media_only", allow_media_only);
|
|
client.queries.push_back(std::move(promise));
|
|
|
|
client_loop(client);
|
|
}
|
|
|
|
void ConnectionCreator::request_raw_connection_by_ip(IPAddress ip_address,
|
|
Promise<std::unique_ptr<mtproto::RawConnection>> promise) {
|
|
auto r_socket_fd = SocketFd::open(ip_address);
|
|
if (r_socket_fd.is_error()) {
|
|
return promise.set_error(r_socket_fd.move_as_error());
|
|
}
|
|
auto raw_connection = std::make_unique<mtproto::RawConnection>(
|
|
r_socket_fd.move_as_ok(), mtproto::TransportType{mtproto::TransportType::ObfuscatedTcp, 0, ""}, nullptr);
|
|
raw_connection->extra_ = network_generation_;
|
|
promise.set_value(std::move(raw_connection));
|
|
}
|
|
|
|
void ConnectionCreator::client_loop(ClientInfo &client) {
|
|
CHECK(client.hash != 0);
|
|
if (!network_flag_) {
|
|
return;
|
|
}
|
|
if (close_flag_) {
|
|
return;
|
|
}
|
|
auto proxy_type = proxy_.type();
|
|
bool use_proxy = proxy_type != Proxy::Type::None;
|
|
bool use_socks5_proxy = proxy_type == Proxy::Type::Socks5;
|
|
bool use_mtproto_proxy = proxy_type == Proxy::Type::Mtproto;
|
|
if (use_proxy && !proxy_ip_address_.is_valid()) {
|
|
return;
|
|
}
|
|
|
|
VLOG(connections) << "client_loop: " << tag("client", format::as_hex(client.hash));
|
|
|
|
// Remove expired ready connections
|
|
client.ready_connections.erase(
|
|
std::remove_if(client.ready_connections.begin(), client.ready_connections.end(),
|
|
[&, expire_at = Time::now_cached() - ClientInfo::READY_CONNECTIONS_TIMEOUT](auto &v) {
|
|
bool drop = v.second < expire_at;
|
|
VLOG_IF(connections, drop) << "Drop expired " << tag("connection", v.first.get());
|
|
return drop;
|
|
}),
|
|
client.ready_connections.end());
|
|
|
|
// Send ready connections into promises
|
|
{
|
|
auto begin = client.queries.begin();
|
|
auto it = begin;
|
|
while (it != client.queries.end() && !client.ready_connections.empty()) {
|
|
VLOG(connections) << "Send to promise " << tag("connection", client.ready_connections.back().first.get());
|
|
it->set_value(std::move(client.ready_connections.back().first));
|
|
client.ready_connections.pop_back();
|
|
++it;
|
|
}
|
|
client.queries.erase(begin, it);
|
|
}
|
|
|
|
// Main loop. Create new connections till needed
|
|
bool check_mode = client.checking_connections != 0 && !use_proxy;
|
|
while (true) {
|
|
// Check if we need new connections
|
|
if (client.queries.empty()) {
|
|
if (!client.ready_connections.empty()) {
|
|
client_set_timeout_at(client, Time::now() + ClientInfo::READY_CONNECTIONS_TIMEOUT);
|
|
}
|
|
return;
|
|
}
|
|
if (check_mode) {
|
|
if (client.checking_connections >= 3) {
|
|
return;
|
|
}
|
|
} else {
|
|
if (client.pending_connections >= client.queries.size()) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Check flood
|
|
auto &flood_control = online_flag_ ? client.flood_control_online : client.flood_control;
|
|
auto wakeup_at = max(flood_control.get_wakeup_at(), client.mtproto_error_flood_control.get_wakeup_at());
|
|
if (!online_flag_) {
|
|
wakeup_at = max(wakeup_at, client.backoff.get_wakeup_at());
|
|
}
|
|
if (wakeup_at > Time::now()) {
|
|
return client_set_timeout_at(client, wakeup_at);
|
|
}
|
|
flood_control.add_event(static_cast<int32>(Time::now()));
|
|
if (!online_flag_) {
|
|
client.backoff.add_event(static_cast<int32>(Time::now()));
|
|
}
|
|
|
|
// Create new RawConnection
|
|
DcOptionsSet::Stat *stat{nullptr};
|
|
mtproto::TransportType transport_type;
|
|
string debug_str;
|
|
|
|
IPAddress mtproto_ip;
|
|
|
|
// sync part
|
|
auto r_socket_fd = [&, dc_id = client.dc_id, allow_media_only = client.allow_media_only]() -> Result<SocketFd> {
|
|
if (use_mtproto_proxy) {
|
|
TRY_RESULT(info, dc_options_set_.find_connection(dc_id, allow_media_only, use_proxy));
|
|
stat = nullptr;
|
|
int16 raw_dc_id = narrow_cast<int16>(info.option->is_media_only() ? -dc_id.get_raw_id() : dc_id.get_raw_id());
|
|
TRY_RESULT(secret, hex_decode(proxy_.secret()));
|
|
transport_type = {mtproto::TransportType::ObfuscatedTcp, raw_dc_id, std::move(secret)};
|
|
|
|
debug_str = PSTRING() << "Mtproto " << proxy_ip_address_ << " to DC" << raw_dc_id;
|
|
LOG(INFO) << "Create: " << debug_str;
|
|
return SocketFd::open(proxy_ip_address_);
|
|
}
|
|
|
|
TRY_RESULT(info, dc_options_set_.find_connection(dc_id, allow_media_only, use_proxy));
|
|
stat = info.stat;
|
|
if (info.use_http) {
|
|
transport_type = {mtproto::TransportType::Http, 0, ""};
|
|
} else {
|
|
int16 raw_dc_id = narrow_cast<int16>(info.option->is_media_only() ? -dc_id.get_raw_id() : dc_id.get_raw_id());
|
|
transport_type = {mtproto::TransportType::ObfuscatedTcp, raw_dc_id, info.option->get_secret().str()};
|
|
}
|
|
check_mode |= info.should_check;
|
|
|
|
if (use_socks5_proxy) {
|
|
mtproto_ip = info.option->get_ip_address();
|
|
debug_str = PSTRING() << "Sock5 " << proxy_ip_address_ << " --> " << mtproto_ip << " " << dc_id;
|
|
LOG(INFO) << "Create: " << debug_str;
|
|
return SocketFd::open(proxy_ip_address_);
|
|
} else {
|
|
debug_str = PSTRING() << info.option->get_ip_address() << " " << dc_id << (info.use_http ? " HTTP" : "")
|
|
<< (info.option->is_media_only() ? " MEDIA" : "");
|
|
LOG(INFO) << "Create: " << debug_str;
|
|
return SocketFd::open(info.option->get_ip_address());
|
|
}
|
|
}();
|
|
if (r_socket_fd.is_error()) {
|
|
LOG(WARNING) << r_socket_fd.error();
|
|
if (stat) {
|
|
stat->on_error(); // TODO: different kind of error
|
|
}
|
|
return client_set_timeout_at(client, Time::now() + 0.1);
|
|
}
|
|
|
|
auto socket_fd = r_socket_fd.move_as_ok();
|
|
IPAddress debug_ip;
|
|
auto debug_ip_status = debug_ip.init_socket_address(socket_fd);
|
|
if (debug_ip_status.is_ok()) {
|
|
debug_str = PSTRING() << debug_str << " from " << debug_ip;
|
|
} else {
|
|
LOG(ERROR) << debug_ip_status;
|
|
}
|
|
|
|
client.pending_connections++;
|
|
if (check_mode) {
|
|
if (stat) {
|
|
stat->on_check();
|
|
}
|
|
client.checking_connections++;
|
|
}
|
|
|
|
auto promise = PromiseCreator::lambda(
|
|
[actor_id = actor_id(this), check_mode, transport_type, hash = client.hash, debug_str,
|
|
network_generation = network_generation_](Result<ConnectionData> r_connection_data) mutable {
|
|
send_closure(std::move(actor_id), &ConnectionCreator::client_create_raw_connection,
|
|
std::move(r_connection_data), check_mode, transport_type, hash, debug_str, network_generation);
|
|
});
|
|
|
|
auto stats_callback = std::make_unique<detail::StatsCallback>(
|
|
client.is_media ? media_net_stats_callback_ : common_net_stats_callback_, actor_id(this), client.hash, stat);
|
|
|
|
if (use_socks5_proxy) {
|
|
class Callback : public Socks5::Callback {
|
|
public:
|
|
explicit Callback(Promise<ConnectionData> promise, std::unique_ptr<detail::StatsCallback> stats_callback)
|
|
: promise_(std::move(promise)), stats_callback_(std::move(stats_callback)) {
|
|
}
|
|
void set_result(Result<SocketFd> result) override {
|
|
if (result.is_error()) {
|
|
connection_token_ = StateManager::ConnectionToken();
|
|
if (was_connected_) {
|
|
stats_callback_->on_error();
|
|
}
|
|
promise_.set_error(result.move_as_error());
|
|
} else {
|
|
ConnectionData data;
|
|
data.socket_fd = result.move_as_ok();
|
|
data.connection_token = std::move(connection_token_);
|
|
data.stats_callback = std::move(stats_callback_);
|
|
promise_.set_value(std::move(data));
|
|
}
|
|
}
|
|
void on_connected() override {
|
|
connection_token_ = StateManager::connection_proxy(G()->state_manager());
|
|
was_connected_ = true;
|
|
}
|
|
|
|
private:
|
|
Promise<ConnectionData> promise_;
|
|
StateManager::ConnectionToken connection_token_;
|
|
bool was_connected_{false};
|
|
std::unique_ptr<detail::StatsCallback> stats_callback_;
|
|
};
|
|
LOG(INFO) << "Start socks5: " << debug_str;
|
|
auto token = next_token();
|
|
children_[token] = create_actor<Socks5>(
|
|
"Socks5", std::move(socket_fd), mtproto_ip, proxy_.user().str(), proxy_.password().str(),
|
|
std::make_unique<Callback>(std::move(promise), std::move(stats_callback)), create_reference(token));
|
|
} else {
|
|
ConnectionData data;
|
|
data.socket_fd = std::move(socket_fd);
|
|
data.stats_callback = std::move(stats_callback);
|
|
promise.set_result(std::move(data));
|
|
}
|
|
}
|
|
}
|
|
|
|
void ConnectionCreator::client_create_raw_connection(Result<ConnectionData> r_connection_data, bool check_mode,
|
|
mtproto::TransportType transport_type, size_t hash,
|
|
string debug_str, uint32 network_generation) {
|
|
auto promise = PromiseCreator::lambda([actor_id = actor_id(this), hash, check_mode,
|
|
debug_str](Result<std::unique_ptr<mtproto::RawConnection>> result) mutable {
|
|
VLOG(connections) << "Ready " << debug_str << " " << tag("checked", check_mode) << tag("ok", result.is_ok());
|
|
send_closure(std::move(actor_id), &ConnectionCreator::client_add_connection, hash, std::move(result), check_mode);
|
|
});
|
|
|
|
if (r_connection_data.is_error()) {
|
|
return promise.set_error(r_connection_data.move_as_error());
|
|
}
|
|
|
|
auto connection_data = r_connection_data.move_as_ok();
|
|
auto raw_connection = std::make_unique<mtproto::RawConnection>(
|
|
std::move(connection_data.socket_fd), std::move(transport_type), std::move(connection_data.stats_callback));
|
|
raw_connection->set_connection_token(std::move(connection_data.connection_token));
|
|
|
|
raw_connection->extra_ = network_generation;
|
|
raw_connection->debug_str_ = debug_str;
|
|
|
|
if (check_mode) {
|
|
VLOG(connections) << "Start check: " << debug_str;
|
|
auto token = next_token();
|
|
children_[token] = create_actor<detail::PingActor>("PingActor", std::move(raw_connection), std::move(promise),
|
|
create_reference(token));
|
|
} else {
|
|
promise.set_value(std::move(raw_connection));
|
|
}
|
|
}
|
|
|
|
void ConnectionCreator::client_set_timeout_at(ClientInfo &client, double wakeup_at) {
|
|
if (!client.slot.has_event()) {
|
|
client.slot.set_event(self_closure(this, &ConnectionCreator::client_wakeup, client.hash));
|
|
}
|
|
client.slot.set_timeout_at(wakeup_at);
|
|
VLOG(connections) << tag("client", format::as_hex(client.hash)) << " set timeout in "
|
|
<< wakeup_at - Time::now_cached();
|
|
}
|
|
|
|
void ConnectionCreator::client_add_connection(size_t hash,
|
|
Result<std::unique_ptr<mtproto::RawConnection>> r_raw_connection,
|
|
bool check_flag) {
|
|
auto &client = clients_[hash];
|
|
CHECK(client.pending_connections > 0);
|
|
client.pending_connections--;
|
|
if (check_flag) {
|
|
CHECK(client.checking_connections > 0);
|
|
client.checking_connections--;
|
|
}
|
|
if (r_raw_connection.is_ok()) {
|
|
client.backoff.clear();
|
|
client.ready_connections.push_back(std::make_pair(r_raw_connection.move_as_ok(), Time::now_cached()));
|
|
}
|
|
client_loop(client);
|
|
}
|
|
|
|
void ConnectionCreator::client_wakeup(size_t hash) {
|
|
LOG(INFO) << tag("hash", format::as_hex(hash)) << " wakeup";
|
|
client_loop(clients_[hash]);
|
|
}
|
|
|
|
void ConnectionCreator::on_dc_options(DcOptions new_dc_options) {
|
|
LOG(INFO) << "SAVE " << new_dc_options;
|
|
G()->td_db()->get_binlog_pmc()->set("dc_options", serialize(new_dc_options));
|
|
dc_options_set_.reset();
|
|
dc_options_set_.add_dc_options(get_default_dc_options(G()->is_test_dc()));
|
|
#if !TD_EMSCRIPTEN // FIXME
|
|
dc_options_set_.add_dc_options(std::move(new_dc_options));
|
|
#endif
|
|
}
|
|
|
|
void ConnectionCreator::on_dc_update(DcId dc_id, string ip_port, Promise<> promise) {
|
|
promise.set_result([&]() -> Result<> {
|
|
if (!dc_id.is_exact()) {
|
|
return Status::Error("Invalid dc_id");
|
|
}
|
|
IPAddress ip_address;
|
|
TRY_STATUS(ip_address.init_host_port(ip_port));
|
|
DcOptions options;
|
|
options.dc_options.emplace_back(dc_id, ip_address);
|
|
send_closure(G()->config_manager(), &ConfigManager::on_dc_options_update, std::move(options));
|
|
return Unit();
|
|
}());
|
|
}
|
|
|
|
void ConnectionCreator::start_up() {
|
|
class StateCallback : public StateManager::Callback {
|
|
public:
|
|
explicit StateCallback(ActorId<ConnectionCreator> connection_creator)
|
|
: connection_creator_(std::move(connection_creator)) {
|
|
}
|
|
bool on_network(NetType network_type, uint32 generation) override {
|
|
send_closure(connection_creator_, &ConnectionCreator::on_network, network_type != NetType::None, generation);
|
|
return connection_creator_.is_alive();
|
|
}
|
|
bool on_online(bool online_flag) override {
|
|
send_closure(connection_creator_, &ConnectionCreator::on_online, online_flag);
|
|
return connection_creator_.is_alive();
|
|
}
|
|
|
|
private:
|
|
ActorId<ConnectionCreator> connection_creator_;
|
|
};
|
|
send_closure(G()->state_manager(), &StateManager::add_callback, make_unique<StateCallback>(actor_id(this)));
|
|
|
|
auto serialized_dc_options = G()->td_db()->get_binlog_pmc()->get("dc_options");
|
|
DcOptions dc_options;
|
|
auto status = unserialize(dc_options, serialized_dc_options);
|
|
if (status.is_error()) {
|
|
on_dc_options(DcOptions());
|
|
} else {
|
|
on_dc_options(std::move(dc_options));
|
|
}
|
|
|
|
Proxy proxy;
|
|
auto log_event_proxy = G()->td_db()->get_binlog_pmc()->get("proxy");
|
|
if (!log_event_proxy.empty()) {
|
|
log_event_parse(proxy, log_event_proxy).ensure();
|
|
}
|
|
set_proxy_impl(std::move(proxy), true);
|
|
|
|
get_host_by_name_actor_ =
|
|
create_actor_on_scheduler<GetHostByNameActor>("GetHostByNameActor", G()->get_gc_scheduler_id(), 5 * 60 - 1, 0);
|
|
|
|
ref_cnt_guard_ = create_reference(-1);
|
|
|
|
loop();
|
|
}
|
|
|
|
void ConnectionCreator::hangup_shared() {
|
|
ref_cnt_--;
|
|
children_.erase(get_link_token());
|
|
if (ref_cnt_ == 0) {
|
|
stop();
|
|
}
|
|
}
|
|
|
|
ActorShared<ConnectionCreator> ConnectionCreator::create_reference(int64 token) {
|
|
CHECK(token != 0);
|
|
ref_cnt_++;
|
|
return actor_shared(this, token);
|
|
}
|
|
|
|
void ConnectionCreator::hangup() {
|
|
close_flag_ = true;
|
|
ref_cnt_guard_.reset();
|
|
for (auto &child : children_) {
|
|
child.second.reset();
|
|
}
|
|
}
|
|
|
|
DcOptions ConnectionCreator::get_default_dc_options(bool is_test) {
|
|
DcOptions res;
|
|
auto add_ip_ports = [&res](int32 dc_id, const vector<string> &ips, const vector<int> &ports, bool is_ipv6 = false) {
|
|
IPAddress ip_address;
|
|
for (auto &ip : ips) {
|
|
for (auto port : ports) {
|
|
if (is_ipv6) {
|
|
ip_address.init_ipv6_port(ip, port).ensure();
|
|
} else {
|
|
ip_address.init_ipv4_port(ip, port).ensure();
|
|
}
|
|
res.dc_options.emplace_back(DcId::internal(dc_id), ip_address);
|
|
}
|
|
}
|
|
};
|
|
vector<int> ports = {443, 80, 5222};
|
|
if (is_test) {
|
|
add_ip_ports(1, {"149.154.175.10"}, ports);
|
|
add_ip_ports(2, {"149.154.167.40"}, ports);
|
|
add_ip_ports(3, {"149.154.175.117"}, ports);
|
|
|
|
add_ip_ports(1, {"2001:b28:f23d:f001::e"}, ports, true);
|
|
add_ip_ports(2, {"2001:67c:4e8:f002::e"}, ports, true);
|
|
add_ip_ports(3, {"2001:b28:f23d:f003::e"}, ports, true);
|
|
} else {
|
|
add_ip_ports(1, {"149.154.175.50"}, ports);
|
|
add_ip_ports(2, {"149.154.167.51"}, ports);
|
|
add_ip_ports(3, {"149.154.175.100"}, ports);
|
|
add_ip_ports(4, {"149.154.167.91"}, ports);
|
|
add_ip_ports(5, {"149.154.171.5"}, ports);
|
|
|
|
add_ip_ports(1, {"2001:b28:f23d:f001::a"}, ports, true);
|
|
add_ip_ports(2, {"2001:67c:4e8:f002::a"}, ports, true);
|
|
add_ip_ports(3, {"2001:b28:f23d:f003::a"}, ports, true);
|
|
add_ip_ports(4, {"2001:67c:4e8:f004::a"}, ports, true);
|
|
add_ip_ports(5, {"2001:b28:f23f:f005::a"}, ports, true);
|
|
}
|
|
return res;
|
|
}
|
|
|
|
void ConnectionCreator::loop() {
|
|
if (!network_flag_) {
|
|
return;
|
|
}
|
|
|
|
Timestamp timeout;
|
|
if (proxy_.type() == Proxy::Type::Mtproto) {
|
|
if (get_proxy_info_timestamp_.is_in_past()) {
|
|
if (get_proxy_info_query_token_ == 0) {
|
|
get_proxy_info_query_token_ = next_token();
|
|
auto query = G()->net_query_creator().create(create_storer(telegram_api::help_getProxyData()));
|
|
G()->net_query_dispatcher().dispatch_with_callback(std::move(query),
|
|
actor_shared(this, get_proxy_info_query_token_));
|
|
}
|
|
} else {
|
|
CHECK(get_proxy_info_query_token_ == 0);
|
|
timeout.relax(get_proxy_info_timestamp_);
|
|
}
|
|
}
|
|
|
|
if (proxy_.type() != Proxy::Type::None) {
|
|
if (resolve_proxy_timestamp_.is_in_past()) {
|
|
if (resolve_proxy_query_token_ == 0) {
|
|
resolve_proxy_query_token_ = next_token();
|
|
send_closure(
|
|
get_host_by_name_actor_, &GetHostByNameActor::run, proxy_.server().str(), proxy_.port(),
|
|
PromiseCreator::lambda([actor_id = create_reference(resolve_proxy_query_token_)](Result<IPAddress> result) {
|
|
send_closure(std::move(actor_id), &ConnectionCreator::on_proxy_resolved, std::move(result), false);
|
|
}));
|
|
}
|
|
} else {
|
|
CHECK(resolve_proxy_query_token_ == 0);
|
|
timeout.relax(resolve_proxy_timestamp_);
|
|
}
|
|
}
|
|
|
|
if (timeout) {
|
|
set_timeout_at(timeout.at());
|
|
}
|
|
}
|
|
|
|
void ConnectionCreator::on_result(NetQueryPtr query) {
|
|
if (get_link_token() != get_proxy_info_query_token_) {
|
|
return;
|
|
}
|
|
|
|
SCOPE_EXIT {
|
|
loop();
|
|
};
|
|
|
|
get_proxy_info_query_token_ = 0;
|
|
auto res = fetch_result<telegram_api::help_getProxyData>(std::move(query));
|
|
if (res.is_error()) {
|
|
if (G()->close_flag()) {
|
|
return;
|
|
}
|
|
LOG(ERROR) << "Receive error for getProxyData: " << res.error();
|
|
return schedule_get_proxy_info(60);
|
|
}
|
|
on_get_proxy_info(res.move_as_ok());
|
|
}
|
|
|
|
void ConnectionCreator::on_get_proxy_info(telegram_api::object_ptr<telegram_api::help_ProxyData> proxy_data_ptr) {
|
|
CHECK(proxy_data_ptr != nullptr);
|
|
LOG(INFO) << "Receive " << to_string(proxy_data_ptr);
|
|
int32 expires = 0;
|
|
switch (proxy_data_ptr->get_id()) {
|
|
case telegram_api::help_proxyDataEmpty::ID: {
|
|
auto proxy = telegram_api::move_object_as<telegram_api::help_proxyDataEmpty>(proxy_data_ptr);
|
|
expires = proxy->expires_;
|
|
send_closure(G()->messages_manager(), &MessagesManager::on_get_promoted_dialog_id, nullptr,
|
|
vector<tl_object_ptr<telegram_api::User>>(), vector<tl_object_ptr<telegram_api::Chat>>());
|
|
break;
|
|
}
|
|
case telegram_api::help_proxyDataPromo::ID: {
|
|
auto proxy = telegram_api::move_object_as<telegram_api::help_proxyDataPromo>(proxy_data_ptr);
|
|
expires = proxy->expires_;
|
|
send_closure(G()->messages_manager(), &MessagesManager::on_get_promoted_dialog_id, std::move(proxy->peer_),
|
|
std::move(proxy->users_), std::move(proxy->chats_));
|
|
break;
|
|
}
|
|
default:
|
|
UNREACHABLE();
|
|
}
|
|
if (expires != 0) {
|
|
expires -= G()->unix_time();
|
|
}
|
|
schedule_get_proxy_info(expires);
|
|
}
|
|
|
|
void ConnectionCreator::schedule_get_proxy_info(int32 expires) {
|
|
if (expires < 0) {
|
|
LOG(ERROR) << "Receive wrong expires: " << expires;
|
|
expires = 0;
|
|
}
|
|
if (expires != 0 && expires < 60) {
|
|
expires = 60;
|
|
}
|
|
if (expires > 86400) {
|
|
expires = 86400;
|
|
}
|
|
get_proxy_info_timestamp_ = Timestamp::in(expires);
|
|
}
|
|
|
|
void ConnectionCreator::on_proxy_resolved(Result<IPAddress> r_ip_address, bool dummy) {
|
|
if (get_link_token() != resolve_proxy_query_token_) {
|
|
return;
|
|
}
|
|
|
|
SCOPE_EXIT {
|
|
loop();
|
|
};
|
|
|
|
resolve_proxy_query_token_ = 0;
|
|
if (r_ip_address.is_error()) {
|
|
resolve_proxy_timestamp_ = Timestamp::in(1 * 60);
|
|
return;
|
|
}
|
|
proxy_ip_address_ = r_ip_address.move_as_ok();
|
|
resolve_proxy_timestamp_ = Timestamp::in(5 * 60);
|
|
for (auto &client : clients_) {
|
|
client_loop(client.second);
|
|
}
|
|
}
|
|
|
|
} // namespace td
|