diff --git a/td/mtproto/RawConnection.cpp b/td/mtproto/RawConnection.cpp index c3fa421f4..77af4b90d 100644 --- a/td/mtproto/RawConnection.cpp +++ b/td/mtproto/RawConnection.cpp @@ -15,7 +15,6 @@ #include "td/net/DarwinHttp.h" #endif -#include "td/utils/BufferedFd.h" #include "td/utils/format.h" #include "td/utils/logging.h" #include "td/utils/misc.h" @@ -35,8 +34,9 @@ namespace mtproto { class RawConnectionDefault final : public RawConnection { public: - RawConnectionDefault(SocketFd socket_fd, TransportType transport_type, unique_ptr stats_callback) - : socket_fd_(std::move(socket_fd)) + RawConnectionDefault(BufferedFd buffered_socket_fd, TransportType transport_type, + unique_ptr stats_callback) + : socket_fd_(std::move(buffered_socket_fd)) , transport_(create_transport(std::move(transport_type))) , stats_callback_(std::move(stats_callback)) { transport_->init(&socket_fd_.input_buffer(), &socket_fd_.output_buffer()); @@ -450,12 +450,13 @@ class RawConnectionHttp final : public RawConnection { }; #endif -unique_ptr RawConnection::create(IPAddress ip_address, SocketFd socket_fd, TransportType transport_type, +unique_ptr RawConnection::create(IPAddress ip_address, BufferedFd buffered_socket_fd, + TransportType transport_type, unique_ptr stats_callback) { #if TD_DARWIN_WATCH_OS return td::make_unique(std::move(ip_address), std::move(stats_callback)); #else - return td::make_unique(std::move(socket_fd), std::move(transport_type), + return td::make_unique(std::move(buffered_socket_fd), std::move(transport_type), std::move(stats_callback)); #endif } diff --git a/td/mtproto/RawConnection.h b/td/mtproto/RawConnection.h index 98cee88e0..63f05b0a8 100644 --- a/td/mtproto/RawConnection.h +++ b/td/mtproto/RawConnection.h @@ -11,6 +11,7 @@ #include "td/mtproto/TransportType.h" #include "td/utils/buffer.h" +#include "td/utils/BufferedFd.h" #include "td/utils/common.h" #include "td/utils/port/detail/PollableFd.h" #include "td/utils/port/IPAddress.h" @@ -40,8 +41,8 @@ class RawConnection { RawConnection &operator=(const RawConnection &) = delete; virtual ~RawConnection() = default; - static unique_ptr create(IPAddress ip_address, SocketFd socket_fd, TransportType transport_type, - unique_ptr stats_callback); + static unique_ptr create(IPAddress ip_address, BufferedFd buffered_socket_fd, + TransportType transport_type, unique_ptr stats_callback); virtual void set_connection_token(ConnectionManager::ConnectionToken connection_token) = 0; diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index 82e198b43..92fe1cd2b 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -559,7 +559,7 @@ class TestProxyRequest final : public RequestOnceActor { auto handshake = make_unique(dc_id_, 3600); auto data = r_data.move_as_ok(); auto raw_connection = - mtproto::RawConnection::create(data.ip_address, std::move(data.socket_fd), get_transport(), nullptr); + mtproto::RawConnection::create(data.ip_address, std::move(data.buffered_socket_fd), get_transport(), nullptr); child_ = create_actor( "HandshakeActor", std::move(handshake), std::move(raw_connection), make_unique(), 10.0, PromiseCreator::lambda([actor_id = actor_id(this)](Result> raw_connection) { diff --git a/td/telegram/net/ConnectionCreator.cpp b/td/telegram/net/ConnectionCreator.cpp index 14700f18e..a4d927171 100644 --- a/td/telegram/net/ConnectionCreator.cpp +++ b/td/telegram/net/ConnectionCreator.cpp @@ -328,12 +328,12 @@ void ConnectionCreator::ping_proxy(int32 proxy_id, Promise promise) { continue; } - ping_proxy_socket_fd(std::move(ip_address), r_socket_fd.move_as_ok(), r_transport_type.move_as_ok(), - PSTRING() << info.option->get_ip_address(), - PromiseCreator::lambda([actor_id = actor_id(this), token](Result result) { - send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token, - std::move(result)); - })); + ping_proxy_buffered_socket_fd(std::move(ip_address), BufferedFd(r_socket_fd.move_as_ok()), + r_transport_type.move_as_ok(), PSTRING() << info.option->get_ip_address(), + PromiseCreator::lambda([actor_id = actor_id(this), token](Result result) { + send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token, + std::move(result)); + })); } return; } @@ -375,8 +375,9 @@ void ConnectionCreator::ping_proxy_resolved(int32 proxy_id, IPAddress ip_address if (r_connection_data.is_error()) { return promise.set_error(Status::Error(400, r_connection_data.error().public_message())); } - send_closure(actor_id, &ConnectionCreator::ping_proxy_socket_fd, ip_address, - r_connection_data.move_as_ok().socket_fd, std::move(transport_type), std::move(debug_str), + auto connection_data = r_connection_data.move_as_ok(); + send_closure(actor_id, &ConnectionCreator::ping_proxy_buffered_socket_fd, ip_address, + std::move(connection_data.buffered_socket_fd), std::move(transport_type), std::move(debug_str), std::move(promise)); }); CHECK(proxy.use_proxy()); @@ -389,12 +390,12 @@ void ConnectionCreator::ping_proxy_resolved(int32 proxy_id, IPAddress ip_address } } -void ConnectionCreator::ping_proxy_socket_fd(IPAddress ip_address, SocketFd socket_fd, - mtproto::TransportType transport_type, string debug_str, - Promise promise) { +void ConnectionCreator::ping_proxy_buffered_socket_fd(IPAddress ip_address, BufferedFd buffered_socket_fd, + mtproto::TransportType transport_type, string debug_str, + Promise promise) { auto token = next_token(); auto raw_connection = - mtproto::RawConnection::create(ip_address, std::move(socket_fd), std::move(transport_type), nullptr); + mtproto::RawConnection::create(ip_address, std::move(buffered_socket_fd), std::move(transport_type), nullptr); children_[token] = { false, create_ping_actor(debug_str, std::move(raw_connection), nullptr, PromiseCreator::lambda([promise = std::move(promise)]( @@ -651,8 +652,9 @@ void ConnectionCreator::request_raw_connection_by_ip(IPAddress ip_address, mtpro if (r_connection_data.is_error()) { return promise.set_error(Status::Error(400, r_connection_data.error().public_message())); } - auto raw_connection = - mtproto::RawConnection::create(ip_address, r_connection_data.move_as_ok().socket_fd, transport_type, nullptr); + auto connection_data = r_connection_data.move_as_ok(); + auto raw_connection = mtproto::RawConnection::create(ip_address, std::move(connection_data.buffered_socket_fd), + transport_type, nullptr); raw_connection->extra().extra = network_generation; promise.set_value(std::move(raw_connection)); }); @@ -754,19 +756,19 @@ ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd , use_connection_token_(use_connection_token) , was_connected_(was_connected) { } - void set_result(Result result) final { - if (result.is_error()) { + void set_result(Result> r_buffered_socket_fd) final { + if (r_buffered_socket_fd.is_error()) { if (use_connection_token_) { connection_token_ = mtproto::ConnectionManager::ConnectionToken(); } if (was_connected_ && stats_callback_) { stats_callback_->on_error(); } - promise_.set_error(Status::Error(400, result.error().public_message())); + promise_.set_error(Status::Error(400, r_buffered_socket_fd.error().public_message())); } else { ConnectionData data; data.ip_address = ip_address_; - data.socket_fd = result.move_as_ok(); + data.buffered_socket_fd = r_buffered_socket_fd.move_as_ok(); data.connection_token = std::move(connection_token_); data.stats_callback = std::move(stats_callback_); promise_.set_value(std::move(data)); @@ -785,7 +787,7 @@ ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd mtproto::ConnectionManager::ConnectionToken connection_token_; IPAddress ip_address_; unique_ptr stats_callback_; - bool use_connection_token_; + bool use_connection_token_{false}; bool was_connected_{false}; }; VLOG(connections) << "Start " @@ -814,7 +816,7 @@ ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd ConnectionData data; data.ip_address = ip_address; - data.socket_fd = std::move(socket_fd); + data.buffered_socket_fd = BufferedFd(std::move(socket_fd)); data.stats_callback = std::move(stats_callback); promise.set_result(std::move(data)); return {}; @@ -991,7 +993,7 @@ void ConnectionCreator::client_create_raw_connection(Result r_co auto connection_data = r_connection_data.move_as_ok(); auto raw_connection = - mtproto::RawConnection::create(connection_data.ip_address, std::move(connection_data.socket_fd), + mtproto::RawConnection::create(connection_data.ip_address, std::move(connection_data.buffered_socket_fd), std::move(transport_type), std::move(connection_data.stats_callback)); raw_connection->set_connection_token(std::move(connection_data.connection_token)); diff --git a/td/telegram/net/ConnectionCreator.h b/td/telegram/net/ConnectionCreator.h index 5a5f69086..42561347c 100644 --- a/td/telegram/net/ConnectionCreator.h +++ b/td/telegram/net/ConnectionCreator.h @@ -25,6 +25,7 @@ #include "td/actor/PromiseFuture.h" #include "td/actor/SignalSlot.h" +#include "td/utils/BufferedFd.h" #include "td/utils/common.h" #include "td/utils/FloodControlStrict.h" #include "td/utils/logging.h" @@ -81,7 +82,7 @@ class ConnectionCreator final : public NetQueryCallback { struct ConnectionData { IPAddress ip_address; - SocketFd socket_fd; + BufferedFd buffered_socket_fd; mtproto::ConnectionManager::ConnectionToken connection_token; unique_ptr stats_callback; }; @@ -246,8 +247,8 @@ class ConnectionCreator final : public NetQueryCallback { void ping_proxy_resolved(int32 proxy_id, IPAddress ip_address, Promise promise); - void ping_proxy_socket_fd(IPAddress ip_address, SocketFd socket_fd, mtproto::TransportType transport_type, - string debug_str, Promise promise); + void ping_proxy_buffered_socket_fd(IPAddress ip_address, BufferedFd buffered_socket_fd, + mtproto::TransportType transport_type, string debug_str, Promise promise); void on_ping_main_dc_result(uint64 token, Result result); }; diff --git a/tdnet/td/net/TransparentProxy.h b/tdnet/td/net/TransparentProxy.h index 6bb40ab4b..deadb6010 100644 --- a/tdnet/td/net/TransparentProxy.h +++ b/tdnet/td/net/TransparentProxy.h @@ -28,7 +28,7 @@ class TransparentProxy : public Actor { Callback &operator=(const Callback &) = delete; virtual ~Callback() = default; - virtual void set_result(Result) = 0; + virtual void set_result(Result> r_buffered_socket_fd) = 0; virtual void on_connected() = 0; }; diff --git a/test/mtproto.cpp b/test/mtproto.cpp index d30b0b2d4..11fc2e724 100644 --- a/test/mtproto.cpp +++ b/test/mtproto.cpp @@ -217,7 +217,7 @@ class TestPingActor final : public Actor { } ping_connection_ = mtproto::PingConnection::create_req_pq( - mtproto::RawConnection::create(ip_address_, r_socket.move_as_ok(), + mtproto::RawConnection::create(ip_address_, BufferedFd(r_socket.move_as_ok()), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr), 3); @@ -339,7 +339,7 @@ class HandshakeTestActor final : public Actor { } raw_connection_ = mtproto::RawConnection::create( - ip_address, r_socket.move_as_ok(), + ip_address, BufferedFd(r_socket.move_as_ok()), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr); } if (!wait_for_handshake_ && !handshake_) { @@ -438,22 +438,22 @@ RegisterTest mtproto_handshake("Mtproto_handshake"); class Socks5TestActor final : public Actor { public: void start_up() final { - auto promise = PromiseCreator::lambda([actor_id = actor_id(this)](Result res) { + auto promise = PromiseCreator::lambda([actor_id = actor_id(this)](Result> res) { send_closure(actor_id, &Socks5TestActor::on_result, std::move(res), false); }); class Callback final : public TransparentProxy::Callback { public: - explicit Callback(Promise promise) : promise_(std::move(promise)) { + explicit Callback(Promise> promise) : promise_(std::move(promise)) { } - void set_result(Result result) final { + void set_result(Result> result) final { promise_.set_result(std::move(result)); } void on_connected() final { } private: - Promise promise_; + Promise> promise_; }; IPAddress socks5_ip; @@ -470,7 +470,7 @@ class Socks5TestActor final : public Actor { } private: - void on_result(Result res, bool dummy) { + void on_result(Result> res, bool dummy) { res.ensure(); Scheduler::instance()->finish(); } @@ -545,7 +545,7 @@ class FastPingTestActor final : public Actor { } auto raw_connection = mtproto::RawConnection::create( - ip_address, r_socket.move_as_ok(), + ip_address, BufferedFd(r_socket.move_as_ok()), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr); auto handshake = make_unique(get_default_dc_id(), 60 * 100 /*temp*/); create_actor( @@ -676,7 +676,7 @@ TEST(Mtproto, TlsTransport) { void start_up() final { class Callback final : public TransparentProxy::Callback { public: - void set_result(Result result) final { + void set_result(Result> result) final { if (result.is_ok()) { LOG(ERROR) << "Unexpectedly succeeded to connect to MTProto proxy"; } else if (result.error().message() != "Response hash mismatch") {