Better ping of main DC.
GitOrigin-RevId: f3231b9b8f558cfceef6d2b952b4f1f7051cb396
This commit is contained in:
parent
d774b6612f
commit
8253460dbb
@ -101,6 +101,12 @@ class PingActor : public Actor {
|
|||||||
set_timeout_in(10);
|
set_timeout_in(10);
|
||||||
yield();
|
yield();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void hangup() override {
|
||||||
|
finish(Status::Error("Cancelled"));
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
void tear_down() override {
|
void tear_down() override {
|
||||||
finish(Status::OK());
|
finish(Status::OK());
|
||||||
}
|
}
|
||||||
@ -393,12 +399,43 @@ void ConnectionCreator::ping_proxy(int32 proxy_id, Promise<double> promise) {
|
|||||||
if (proxy_id == 0) {
|
if (proxy_id == 0) {
|
||||||
ProxyInfo proxy{nullptr, IPAddress()};
|
ProxyInfo proxy{nullptr, IPAddress()};
|
||||||
auto main_dc_id = G()->net_query_dispatcher().main_dc_id();
|
auto main_dc_id = G()->net_query_dispatcher().main_dc_id();
|
||||||
FindConnectionExtra extra;
|
auto infos = dc_options_set_.find_all_connections(main_dc_id, false, false);
|
||||||
auto r_socket_fd = find_connection(proxy, main_dc_id, false, extra);
|
if (infos.empty()) {
|
||||||
if (r_socket_fd.is_error()) {
|
return promise.set_error(Status::Error(400, "Can't find valid DC address"));
|
||||||
return promise.set_error(Status::Error(400, r_socket_fd.error().message()));
|
|
||||||
}
|
}
|
||||||
return ping_proxy_socket_fd(r_socket_fd.move_as_ok(), extra.transport_type, std::move(promise));
|
const size_t MAX_CONNECTIONS = 10;
|
||||||
|
if (infos.size() > MAX_CONNECTIONS) {
|
||||||
|
infos.resize(MAX_CONNECTIONS);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto token = next_token();
|
||||||
|
auto &request = ping_main_dc_requests_[token];
|
||||||
|
request.promise = std::move(promise);
|
||||||
|
request.left_queries = infos.size();
|
||||||
|
request.result = Status::Error(400, "Failed to ping");
|
||||||
|
|
||||||
|
for (auto &info : infos) {
|
||||||
|
auto r_transport_type = get_transport_type(ProxyInfo{nullptr, IPAddress()}, info);
|
||||||
|
if (r_transport_type.is_error()) {
|
||||||
|
LOG(ERROR) << r_transport_type.error();
|
||||||
|
on_ping_main_dc_result(token, r_transport_type.move_as_error());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto r_socket_fd = SocketFd::open(info.option->get_ip_address());
|
||||||
|
if (r_socket_fd.is_error()) {
|
||||||
|
LOG(DEBUG) << "Failed to open socket: " << r_socket_fd.error();
|
||||||
|
on_ping_main_dc_result(token, r_socket_fd.move_as_error());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ping_proxy_socket_fd(r_socket_fd.move_as_ok(), r_transport_type.move_as_ok(),
|
||||||
|
PromiseCreator::lambda([actor_id = actor_id(this), token](Result<double> result) {
|
||||||
|
send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token,
|
||||||
|
std::move(result));
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto it = proxies_.find(proxy_id);
|
auto it = proxies_.find(proxy_id);
|
||||||
@ -1277,4 +1314,29 @@ void ConnectionCreator::on_proxy_resolved(Result<IPAddress> r_ip_address, bool d
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ConnectionCreator::on_ping_main_dc_result(uint64 token, Result<double> result) {
|
||||||
|
auto &request = ping_main_dc_requests_[token];
|
||||||
|
CHECK(request.left_queries > 0);
|
||||||
|
if (result.is_error()) {
|
||||||
|
LOG(DEBUG) << "Receive ping error " << result.error();
|
||||||
|
if (request.result.is_error()) {
|
||||||
|
request.result = std::move(result);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG(DEBUG) << "Receive ping result " << result.ok();
|
||||||
|
if (request.result.is_error() || request.result.ok() > result.ok()) {
|
||||||
|
request.result = result.ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (--request.left_queries == 0) {
|
||||||
|
if (request.result.is_error()) {
|
||||||
|
request.promise.set_error(Status::Error(400, request.result.error().message()));
|
||||||
|
} else {
|
||||||
|
request.promise.set_value(request.result.move_as_ok());
|
||||||
|
}
|
||||||
|
ping_main_dc_requests_.erase(token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -222,7 +222,14 @@ class ConnectionCreator : public NetQueryCallback {
|
|||||||
ActorShared<ConnectionCreator> create_reference(int64 token);
|
ActorShared<ConnectionCreator> create_reference(int64 token);
|
||||||
bool close_flag_{false};
|
bool close_flag_{false};
|
||||||
uint64 current_token_ = 0;
|
uint64 current_token_ = 0;
|
||||||
std::map<int64, std::pair<bool, ActorShared<>>> children_;
|
std::map<uint64, std::pair<bool, ActorShared<>>> children_;
|
||||||
|
|
||||||
|
struct PingMainDcRequest {
|
||||||
|
Promise<double> promise;
|
||||||
|
size_t left_queries = 0;
|
||||||
|
Result<double> result;
|
||||||
|
};
|
||||||
|
std::map<uint64, PingMainDcRequest> ping_main_dc_requests_;
|
||||||
|
|
||||||
uint64 next_token() {
|
uint64 next_token() {
|
||||||
return ++current_token_;
|
return ++current_token_;
|
||||||
@ -295,6 +302,8 @@ class ConnectionCreator : public NetQueryCallback {
|
|||||||
void ping_proxy_resolved(int32 proxy_id, IPAddress ip_address, Promise<double> promise);
|
void ping_proxy_resolved(int32 proxy_id, IPAddress ip_address, Promise<double> promise);
|
||||||
|
|
||||||
void ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type, Promise<double> promise);
|
void ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type, Promise<double> promise);
|
||||||
|
|
||||||
|
void on_ping_main_dc_result(uint64 token, Result<double> result);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
Reference in New Issue
Block a user