Ping proxy

GitOrigin-RevId: ff4dfa54bee356a06b1ec076711fb28b894221cb
This commit is contained in:
Arseny Smirnov 2018-05-18 23:43:38 +03:00
parent b3ca84fffb
commit 3675e38605
2 changed files with 175 additions and 64 deletions

View File

@ -155,6 +155,35 @@ class PingActor : public Actor {
} // namespace detail } // namespace detail
class ConnectionCreator::ProxyInfo {
public:
explicit ProxyInfo(Proxy *proxy, IPAddress ip_address) : proxy_(proxy), ip_address_(std::move(ip_address)) {
}
bool use_proxy() const {
return proxy_ != nullptr;
}
Proxy::Type proxy_type() const {
return proxy_ == nullptr ? Proxy::Type::None : proxy_->type();
}
bool use_socks5_proxy() const {
return proxy_type() == Proxy::Type::Socks5;
}
bool use_mtproto_proxy() const {
return proxy_type() == Proxy::Type::Mtproto;
}
Proxy &proxy() {
CHECK(use_proxy());
return *proxy_;
}
IPAddress &ip_address() {
return ip_address_;
}
private:
Proxy *proxy_;
IPAddress ip_address_;
};
template <class T> template <class T>
void Proxy::parse(T &parser) { void Proxy::parse(T &parser) {
using td::parse; using td::parse;
@ -361,12 +390,85 @@ void ConnectionCreator::get_proxy_link(int32 proxy_id, Promise<string> promise)
} }
void ConnectionCreator::ping_proxy(int32 proxy_id, Promise<double> promise) { void ConnectionCreator::ping_proxy(int32 proxy_id, Promise<double> promise) {
if (proxies_.count(proxy_id) == 0) { auto it = proxies_.find(proxy_id);
if (it == proxies_.end()) {
return promise.set_error(Status::Error(400, "Unknown proxy identifier")); return promise.set_error(Status::Error(400, "Unknown proxy identifier"));
} }
Proxy &proxy = it->second;
send_closure(get_host_by_name_actor_, &GetHostByNameActor::run, proxy.server().str(), proxy.port(),
PromiseCreator::lambda([actor_id = actor_id(this), promise = std::move(promise),
proxy_id](Result<IPAddress> result) mutable {
if (result.is_error()) {
return promise.set_error(result.move_as_error());
}
send_closure(actor_id, &ConnectionCreator::ping_proxy_resolved, proxy_id, result.move_as_ok(),
std::move(promise));
}));
}
// TODO ping void ConnectionCreator::ping_proxy_resolved(int32 proxy_id, IPAddress ip_address, Promise<double> promise) {
promise.set_value(0.0); auto it = proxies_.find(proxy_id);
if (it == proxies_.end()) {
return promise.set_error(Status::Error(400, "Unknown proxy identifier"));
}
ProxyInfo proxy(&it->second, ip_address);
auto main_dc_id = G()->net_query_dispatcher().main_dc_id();
FindConnectionExtra extra;
auto r_socket_fd = find_connection(proxy, main_dc_id, false, extra);
if (r_socket_fd.is_error()) {
return promise.set_error(r_socket_fd.move_as_error());
}
auto socket_fd = r_socket_fd.move_as_ok();
auto socket_fd_promise =
PromiseCreator::lambda([promise = std::move(promise), actor_id = actor_id(this),
transport_type = std::move(extra.transport_type)](Result<SocketFd> r_socket_fd) mutable {
if (r_socket_fd.is_error()) {
return promise.set_error(r_socket_fd.move_as_error());
}
send_closure(actor_id, &ConnectionCreator::ping_proxy_socket_fd, r_socket_fd.move_as_ok(),
std::move(transport_type), std::move(promise));
});
CHECK(proxy.use_proxy());
if (proxy.use_socks5_proxy()) {
class Callback : public Socks5::Callback {
public:
explicit Callback(Promise<SocketFd> promise) : promise_(std::move(promise)) {
}
void set_result(Result<SocketFd> result) override {
promise_.set_result(std::move(result));
}
void on_connected() override {
}
private:
Promise<SocketFd> promise_;
};
LOG(INFO) << "Start socks5: " << extra.debug_str;
auto token = next_token();
children_[token] = create_actor<Socks5>(
"Socks5", std::move(socket_fd), extra.mtproto_ip, proxy.proxy().user().str(), proxy.proxy().password().str(),
std::make_unique<Callback>(std::move(socket_fd_promise)), create_reference(token));
} else {
socket_fd_promise.set_value(std::move(socket_fd));
}
}
void ConnectionCreator::ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type,
Promise<double> promise) {
auto token = next_token();
auto raw_connection =
std::make_unique<mtproto::RawConnection>(std::move(socket_fd), std::move(transport_type), nullptr);
children_[token] = create_actor<detail::PingActor>(
"PingActor", std::move(raw_connection),
PromiseCreator::lambda([start = Time::now(), promise = std::move(promise)](
Result<std::unique_ptr<mtproto::RawConnection>> result) mutable {
if (result.is_error()) {
return promise.set_error(result.move_as_error());
}
promise.set_value(Time::now() - start);
}),
create_reference(token));
} }
void ConnectionCreator::enable_proxy_impl(int32 proxy_id) { void ConnectionCreator::enable_proxy_impl(int32 proxy_id) {
@ -562,6 +664,45 @@ void ConnectionCreator::request_raw_connection_by_ip(IPAddress ip_address,
promise.set_value(std::move(raw_connection)); promise.set_value(std::move(raw_connection));
} }
Result<SocketFd> ConnectionCreator::find_connection(ConnectionCreator::ProxyInfo &proxy, DcId dc_id,
bool allow_media_only, FindConnectionExtra &extra) {
TRY_RESULT(info, dc_options_set_.find_connection(dc_id, allow_media_only, proxy.use_proxy()));
extra.stat = info.stat;
int32 int_dc_id = dc_id.get_raw_id();
if (G()->is_test_dc()) {
int_dc_id += 10000;
}
int16 raw_dc_id = narrow_cast<int16>(info.option->is_media_only() ? -int_dc_id : int_dc_id);
if (proxy.use_mtproto_proxy()) {
TRY_RESULT(secret, hex_decode(proxy.proxy().secret()));
extra.transport_type = {mtproto::TransportType::ObfuscatedTcp, raw_dc_id, std::move(secret)};
extra.debug_str = PSTRING() << "Mtproto " << proxy.ip_address() << " to DC" << raw_dc_id;
LOG(INFO) << "Create: " << extra.debug_str;
return SocketFd::open(proxy.ip_address());
}
if (info.use_http) {
extra.transport_type = {mtproto::TransportType::Http, 0, ""};
} else {
extra.transport_type = {mtproto::TransportType::ObfuscatedTcp, raw_dc_id, info.option->get_secret().str()};
}
extra.check_mode |= info.should_check;
if (proxy.use_socks5_proxy()) {
extra.mtproto_ip = info.option->get_ip_address();
extra.debug_str = PSTRING() << "Socks5 " << proxy.ip_address() << " --> " << extra.mtproto_ip << " " << dc_id;
LOG(INFO) << "Create: " << extra.debug_str;
return SocketFd::open(proxy.ip_address());
} else {
extra.debug_str = PSTRING() << info.option->get_ip_address() << " " << dc_id << (info.use_http ? " HTTP" : "")
<< (info.option->is_media_only() ? " MEDIA" : "");
LOG(INFO) << "Create: " << extra.debug_str;
return SocketFd::open(info.option->get_ip_address());
}
}
void ConnectionCreator::client_loop(ClientInfo &client) { void ConnectionCreator::client_loop(ClientInfo &client) {
CHECK(client.hash != 0); CHECK(client.hash != 0);
if (!network_flag_) { if (!network_flag_) {
@ -571,12 +712,8 @@ void ConnectionCreator::client_loop(ClientInfo &client) {
return; return;
} }
Proxy *proxy = active_proxy_id_ == 0 ? nullptr : &proxies_[active_proxy_id_]; ProxyInfo proxy{active_proxy_id_ == 0 ? nullptr : &proxies_[active_proxy_id_], proxy_ip_address_};
auto proxy_type = proxy == nullptr ? Proxy::Type::None : proxy->type(); if (proxy.use_proxy() && !proxy.ip_address().is_valid()) {
bool use_proxy = proxy != nullptr;
bool use_socks5_proxy = proxy_type == Proxy::Type::Socks5;
bool use_mtproto_proxy = proxy_type == Proxy::Type::Mtproto;
if (use_proxy && !proxy_ip_address_.is_valid()) {
return; return;
} }
@ -606,7 +743,7 @@ void ConnectionCreator::client_loop(ClientInfo &client) {
} }
// Main loop. Create new connections till needed // Main loop. Create new connections till needed
bool check_mode = client.checking_connections != 0 && !use_proxy; bool check_mode = client.checking_connections != 0 && !proxy.use_proxy();
while (true) { while (true) {
// Check if we need new connections // Check if we need new connections
if (client.queries.empty()) { if (client.queries.empty()) {
@ -640,55 +777,15 @@ void ConnectionCreator::client_loop(ClientInfo &client) {
} }
// Create new RawConnection // Create new RawConnection
DcOptionsSet::Stat *stat{nullptr}; //TODO
mtproto::TransportType transport_type;
string debug_str;
IPAddress mtproto_ip;
// sync part // sync part
auto allow_media_only = client.allow_media_only; FindConnectionExtra extra;
auto r_socket_fd = [&, dc_id = client.dc_id]() -> Result<SocketFd> { auto r_socket_fd = find_connection(proxy, client.dc_id, client.allow_media_only, extra);
TRY_RESULT(info, dc_options_set_.find_connection(dc_id, allow_media_only, use_proxy)); check_mode |= extra.check_mode;
stat = info.stat;
int32 int_dc_id = dc_id.get_raw_id();
if (G()->is_test_dc()) {
int_dc_id += 10000;
}
int16 raw_dc_id = narrow_cast<int16>(info.option->is_media_only() ? -int_dc_id : int_dc_id);
if (use_mtproto_proxy) {
TRY_RESULT(secret, hex_decode(proxy->secret()));
transport_type = {mtproto::TransportType::ObfuscatedTcp, raw_dc_id, std::move(secret)};
debug_str = PSTRING() << "Mtproto " << proxy_ip_address_ << " to DC" << raw_dc_id;
LOG(INFO) << "Create: " << debug_str;
return SocketFd::open(proxy_ip_address_);
}
if (info.use_http) {
transport_type = {mtproto::TransportType::Http, 0, ""};
} else {
transport_type = {mtproto::TransportType::ObfuscatedTcp, raw_dc_id, info.option->get_secret().str()};
}
check_mode |= info.should_check;
if (use_socks5_proxy) {
mtproto_ip = info.option->get_ip_address();
debug_str = PSTRING() << "Socks5 " << proxy_ip_address_ << " --> " << mtproto_ip << " " << dc_id;
LOG(INFO) << "Create: " << debug_str;
return SocketFd::open(proxy_ip_address_);
} else {
debug_str = PSTRING() << info.option->get_ip_address() << " " << dc_id << (info.use_http ? " HTTP" : "")
<< (info.option->is_media_only() ? " MEDIA" : "");
LOG(INFO) << "Create: " << debug_str;
return SocketFd::open(info.option->get_ip_address());
}
}();
if (r_socket_fd.is_error()) { if (r_socket_fd.is_error()) {
LOG(WARNING) << r_socket_fd.error(); LOG(WARNING) << r_socket_fd.error();
if (stat) { if (extra.stat) {
stat->on_error(); // TODO: different kind of error extra.stat->on_error(); // TODO: different kind of error
} }
return client_set_timeout_at(client, Time::now() + 0.1); return client_set_timeout_at(client, Time::now() + 0.1);
} }
@ -697,30 +794,32 @@ void ConnectionCreator::client_loop(ClientInfo &client) {
IPAddress debug_ip; IPAddress debug_ip;
auto debug_ip_status = debug_ip.init_socket_address(socket_fd); auto debug_ip_status = debug_ip.init_socket_address(socket_fd);
if (debug_ip_status.is_ok()) { if (debug_ip_status.is_ok()) {
debug_str = PSTRING() << debug_str << " from " << debug_ip; extra.debug_str = PSTRING() << extra.debug_str << " from " << debug_ip;
} else { } else {
LOG(ERROR) << debug_ip_status; LOG(ERROR) << debug_ip_status;
} }
client.pending_connections++; client.pending_connections++;
if (check_mode) { if (check_mode) {
if (stat) { if (extra.stat) {
stat->on_check(); extra.stat->on_check();
} }
client.checking_connections++; client.checking_connections++;
} }
auto promise = PromiseCreator::lambda( auto promise = PromiseCreator::lambda(
[actor_id = actor_id(this), check_mode, transport_type, hash = client.hash, debug_str, [actor_id = actor_id(this), check_mode, transport_type = extra.transport_type, hash = client.hash,
debug_str = extra.debug_str,
network_generation = network_generation_](Result<ConnectionData> r_connection_data) mutable { network_generation = network_generation_](Result<ConnectionData> r_connection_data) mutable {
send_closure(std::move(actor_id), &ConnectionCreator::client_create_raw_connection, send_closure(std::move(actor_id), &ConnectionCreator::client_create_raw_connection,
std::move(r_connection_data), check_mode, transport_type, hash, debug_str, network_generation); std::move(r_connection_data), check_mode, transport_type, hash, debug_str, network_generation);
}); });
auto stats_callback = std::make_unique<detail::StatsCallback>( auto stats_callback = std::make_unique<detail::StatsCallback>(
client.is_media ? media_net_stats_callback_ : common_net_stats_callback_, actor_id(this), client.hash, stat); client.is_media ? media_net_stats_callback_ : common_net_stats_callback_, actor_id(this), client.hash,
extra.stat);
if (use_socks5_proxy) { if (proxy.use_socks5_proxy()) {
class Callback : public Socks5::Callback { class Callback : public Socks5::Callback {
public: public:
explicit Callback(Promise<ConnectionData> promise, std::unique_ptr<detail::StatsCallback> stats_callback) explicit Callback(Promise<ConnectionData> promise, std::unique_ptr<detail::StatsCallback> stats_callback)
@ -752,10 +851,10 @@ void ConnectionCreator::client_loop(ClientInfo &client) {
bool was_connected_{false}; bool was_connected_{false};
std::unique_ptr<detail::StatsCallback> stats_callback_; std::unique_ptr<detail::StatsCallback> stats_callback_;
}; };
LOG(INFO) << "Start socks5: " << debug_str; LOG(INFO) << "Start socks5: " << extra.debug_str;
auto token = next_token(); auto token = next_token();
children_[token] = create_actor<Socks5>( children_[token] = create_actor<Socks5>(
"Socks5", std::move(socket_fd), mtproto_ip, proxy->user().str(), proxy->password().str(), "Socks5", std::move(socket_fd), extra.mtproto_ip, proxy.proxy().user().str(), proxy.proxy().password().str(),
std::make_unique<Callback>(std::move(promise), std::move(stats_callback)), create_reference(token)); std::make_unique<Callback>(std::move(promise), std::move(stats_callback)), create_reference(token));
} else { } else {
ConnectionData data; ConnectionData data;

View File

@ -276,6 +276,18 @@ class ConnectionCreator : public NetQueryCallback {
void on_proxy_resolved(Result<IPAddress> ip_address, bool dummy); void on_proxy_resolved(Result<IPAddress> ip_address, bool dummy);
static DcOptions get_default_dc_options(bool is_test); static DcOptions get_default_dc_options(bool is_test);
struct FindConnectionExtra {
DcOptionsSet::Stat *stat{nullptr};
mtproto::TransportType transport_type;
string debug_str;
IPAddress mtproto_ip;
bool check_mode{false};
};
class ProxyInfo;
Result<SocketFd> find_connection(ProxyInfo &proxy, DcId dc_id, bool allow_media_only, FindConnectionExtra &extra);
void ping_proxy_resolved(int32 proxy_id, IPAddress ip_address, Promise<double> promise);
void ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type, Promise<double> promise);
}; };
} // namespace td } // namespace td