diff --git a/td/mtproto/AuthData.cpp b/td/mtproto/AuthData.cpp index dedca5031..00c946f99 100644 --- a/td/mtproto/AuthData.cpp +++ b/td/mtproto/AuthData.cpp @@ -120,7 +120,7 @@ int64 AuthData::next_message_id(double now) { bool AuthData::is_valid_outbound_msg_id(int64 id, double now) const { double server_time = get_server_time(now); auto id_time = static_cast(id) / static_cast(1ll << 32); - return server_time - 300 / 2 < id_time && id_time < server_time + 30; + return server_time - 150 < id_time && id_time < server_time + 30; } bool AuthData::is_valid_inbound_msg_id(int64 id, double now) const { diff --git a/td/mtproto/RawConnection.cpp b/td/mtproto/RawConnection.cpp index c3fa421f4..77af4b90d 100644 --- a/td/mtproto/RawConnection.cpp +++ b/td/mtproto/RawConnection.cpp @@ -15,7 +15,6 @@ #include "td/net/DarwinHttp.h" #endif -#include "td/utils/BufferedFd.h" #include "td/utils/format.h" #include "td/utils/logging.h" #include "td/utils/misc.h" @@ -35,8 +34,9 @@ namespace mtproto { class RawConnectionDefault final : public RawConnection { public: - RawConnectionDefault(SocketFd socket_fd, TransportType transport_type, unique_ptr stats_callback) - : socket_fd_(std::move(socket_fd)) + RawConnectionDefault(BufferedFd buffered_socket_fd, TransportType transport_type, + unique_ptr stats_callback) + : socket_fd_(std::move(buffered_socket_fd)) , transport_(create_transport(std::move(transport_type))) , stats_callback_(std::move(stats_callback)) { transport_->init(&socket_fd_.input_buffer(), &socket_fd_.output_buffer()); @@ -450,12 +450,13 @@ class RawConnectionHttp final : public RawConnection { }; #endif -unique_ptr RawConnection::create(IPAddress ip_address, SocketFd socket_fd, TransportType transport_type, +unique_ptr RawConnection::create(IPAddress ip_address, BufferedFd buffered_socket_fd, + TransportType transport_type, unique_ptr stats_callback) { #if TD_DARWIN_WATCH_OS return td::make_unique(std::move(ip_address), std::move(stats_callback)); #else - return td::make_unique(std::move(socket_fd), std::move(transport_type), + return td::make_unique(std::move(buffered_socket_fd), std::move(transport_type), std::move(stats_callback)); #endif } diff --git a/td/mtproto/RawConnection.h b/td/mtproto/RawConnection.h index 98cee88e0..63f05b0a8 100644 --- a/td/mtproto/RawConnection.h +++ b/td/mtproto/RawConnection.h @@ -11,6 +11,7 @@ #include "td/mtproto/TransportType.h" #include "td/utils/buffer.h" +#include "td/utils/BufferedFd.h" #include "td/utils/common.h" #include "td/utils/port/detail/PollableFd.h" #include "td/utils/port/IPAddress.h" @@ -40,8 +41,8 @@ class RawConnection { RawConnection &operator=(const RawConnection &) = delete; virtual ~RawConnection() = default; - static unique_ptr create(IPAddress ip_address, SocketFd socket_fd, TransportType transport_type, - unique_ptr stats_callback); + static unique_ptr create(IPAddress ip_address, BufferedFd buffered_socket_fd, + TransportType transport_type, unique_ptr stats_callback); virtual void set_connection_token(ConnectionManager::ConnectionToken connection_token) = 0; diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 93976e701..55884f2be 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -23151,40 +23151,43 @@ tl_object_ptr MessagesManager::get_message_object(DialogId dial } else { ttl = 0; } + auto sender = get_message_sender_object_const(m->sender_user_id, m->sender_dialog_id, source); auto scheduling_state = is_scheduled ? get_message_scheduling_state_object(m->date) : nullptr; - bool can_be_edited = for_event_log ? false : can_edit_message(dialog_id, m, false, td_->auth_manager_->is_bot()); - bool can_be_forwarded = for_event_log ? false : can_forward_message(dialog_id, m); - bool can_get_statistics = for_event_log ? false : can_get_message_statistics(dialog_id, m); - bool can_get_message_thread = for_event_log ? false : get_top_thread_full_message_id(dialog_id, m).is_ok(); - bool can_get_viewers = for_event_log ? false : can_get_message_viewers(dialog_id, m).is_ok(); - bool can_get_media_timestamp_links = for_event_log ? false : can_get_media_timestamp_link(dialog_id, m).is_ok(); + auto forward_info = get_message_forward_info_object(m->forward_info); + auto interaction_info = get_message_interaction_info_object(dialog_id, m); + auto can_be_edited = for_event_log ? false : can_edit_message(dialog_id, m, false, td_->auth_manager_->is_bot()); + auto can_be_forwarded = for_event_log ? false : can_forward_message(dialog_id, m); + auto can_get_statistics = for_event_log ? false : can_get_message_statistics(dialog_id, m); + auto can_get_message_thread = for_event_log ? false : get_top_thread_full_message_id(dialog_id, m).is_ok(); + auto can_get_viewers = for_event_log ? false : can_get_message_viewers(dialog_id, m).is_ok(); + auto can_get_media_timestamp_links = for_event_log ? false : can_get_media_timestamp_link(dialog_id, m).is_ok(); auto via_bot_user_id = td_->contacts_manager_->get_user_id_object(m->via_bot_user_id, "via_bot_user_id"); auto media_album_id = for_event_log ? static_cast(0) : m->media_album_id; auto reply_to_message_id = for_event_log ? static_cast(0) : m->reply_to_message_id.get(); auto reply_in_dialog_id = reply_to_message_id == 0 ? DialogId() : (m->reply_in_dialog_id.is_valid() ? m->reply_in_dialog_id : dialog_id); auto top_thread_message_id = for_event_log || is_scheduled ? static_cast(0) : m->top_thread_message_id.get(); - bool contains_unread_mention = for_event_log ? false : m->contains_unread_mention; - auto live_location_date = m->is_failed_to_send ? 0 : m->date; + auto contains_unread_mention = for_event_log ? false : m->contains_unread_mention; auto date = is_scheduled ? 0 : m->date; auto edit_date = m->hide_edit_date ? 0 : m->edit_date; auto is_pinned = for_event_log || is_scheduled ? false : m->is_pinned; - bool skip_bot_commands = for_event_log ? true : need_skip_bot_commands(dialog_id, m); - int32 max_media_timestamp = + auto has_timestamped_media = for_event_log || reply_to_message_id == 0 || m->max_own_media_timestamp >= 0; + auto reply_markup = get_reply_markup_object(m->reply_markup); + + auto live_location_date = m->is_failed_to_send ? 0 : m->date; + auto skip_bot_commands = for_event_log ? true : need_skip_bot_commands(dialog_id, m); + auto max_media_timestamp = for_event_log ? get_message_own_max_media_timestamp(m) : get_message_max_media_timestamp(m); - bool has_timestamped_media = for_event_log || reply_to_message_id == 0 || m->max_own_media_timestamp >= 0; + auto content = get_message_content_object(m->content.get(), td_, dialog_id, live_location_date, m->is_content_secret, + skip_bot_commands, max_media_timestamp); return make_tl_object( - m->message_id.get(), get_message_sender_object_const(m->sender_user_id, m->sender_dialog_id, source), - dialog_id.get(), std::move(sending_state), std::move(scheduling_state), is_outgoing, is_pinned, can_be_edited, - can_be_forwarded, can_delete_for_self, can_delete_for_all_users, can_get_statistics, can_get_message_thread, - can_get_viewers, can_get_media_timestamp_links, has_timestamped_media, m->is_channel_post, - contains_unread_mention, date, edit_date, get_message_forward_info_object(m->forward_info), - get_message_interaction_info_object(dialog_id, m), reply_in_dialog_id.get(), reply_to_message_id, - top_thread_message_id, ttl, ttl_expires_in, via_bot_user_id, m->author_signature, media_album_id, - get_restriction_reason_description(m->restriction_reasons), - get_message_content_object(m->content.get(), td_, dialog_id, live_location_date, m->is_content_secret, - skip_bot_commands, max_media_timestamp), - get_reply_markup_object(m->reply_markup)); + m->message_id.get(), std::move(sender), dialog_id.get(), std::move(sending_state), std::move(scheduling_state), + is_outgoing, is_pinned, can_be_edited, can_be_forwarded, can_delete_for_self, can_delete_for_all_users, + can_get_statistics, can_get_message_thread, can_get_viewers, can_get_media_timestamp_links, has_timestamped_media, + m->is_channel_post, contains_unread_mention, date, edit_date, std::move(forward_info), + std::move(interaction_info), reply_in_dialog_id.get(), reply_to_message_id, top_thread_message_id, ttl, + ttl_expires_in, via_bot_user_id, m->author_signature, media_album_id, + get_restriction_reason_description(m->restriction_reasons), std::move(content), std::move(reply_markup)); } tl_object_ptr MessagesManager::get_messages_object(int32 total_count, DialogId dialog_id, @@ -28648,10 +28651,11 @@ void MessagesManager::send_update_chat_last_message_impl(const Dialog *d, const LOG_CHECK(d->is_update_new_chat_sent) << "Wrong " << d->dialog_id << " in send_update_chat_last_message from " << source; LOG(INFO) << "Send updateChatLastMessage in " << d->dialog_id << " to " << d->last_message_id << " from " << source; - auto update = make_tl_object( - d->dialog_id.get(), - get_message_object(d->dialog_id, get_message(d, d->last_message_id), "send_update_chat_last_message_impl"), - get_chat_positions_object(d)); + const auto *m = get_message(d, d->last_message_id); + auto message_object = get_message_object(d->dialog_id, m, "send_update_chat_last_message_impl"); + auto positions_object = get_chat_positions_object(d); + auto update = td_api::make_object(d->dialog_id.get(), std::move(message_object), + std::move(positions_object)); send_closure(G()->td(), &Td::send_update, std::move(update)); } diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index e5c56df16..f26e34fdc 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -560,7 +560,7 @@ class TestProxyRequest final : public RequestOnceActor { auto handshake = make_unique(dc_id_, 3600); auto data = r_data.move_as_ok(); auto raw_connection = - mtproto::RawConnection::create(data.ip_address, std::move(data.socket_fd), get_transport(), nullptr); + mtproto::RawConnection::create(data.ip_address, std::move(data.buffered_socket_fd), get_transport(), nullptr); child_ = create_actor( "HandshakeActor", std::move(handshake), std::move(raw_connection), make_unique(), 10.0, PromiseCreator::lambda([actor_id = actor_id(this)](Result> raw_connection) { diff --git a/td/telegram/net/ConnectionCreator.cpp b/td/telegram/net/ConnectionCreator.cpp index 14700f18e..a4d927171 100644 --- a/td/telegram/net/ConnectionCreator.cpp +++ b/td/telegram/net/ConnectionCreator.cpp @@ -328,12 +328,12 @@ void ConnectionCreator::ping_proxy(int32 proxy_id, Promise promise) { continue; } - ping_proxy_socket_fd(std::move(ip_address), r_socket_fd.move_as_ok(), r_transport_type.move_as_ok(), - PSTRING() << info.option->get_ip_address(), - PromiseCreator::lambda([actor_id = actor_id(this), token](Result result) { - send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token, - std::move(result)); - })); + ping_proxy_buffered_socket_fd(std::move(ip_address), BufferedFd(r_socket_fd.move_as_ok()), + r_transport_type.move_as_ok(), PSTRING() << info.option->get_ip_address(), + PromiseCreator::lambda([actor_id = actor_id(this), token](Result result) { + send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token, + std::move(result)); + })); } return; } @@ -375,8 +375,9 @@ void ConnectionCreator::ping_proxy_resolved(int32 proxy_id, IPAddress ip_address if (r_connection_data.is_error()) { return promise.set_error(Status::Error(400, r_connection_data.error().public_message())); } - send_closure(actor_id, &ConnectionCreator::ping_proxy_socket_fd, ip_address, - r_connection_data.move_as_ok().socket_fd, std::move(transport_type), std::move(debug_str), + auto connection_data = r_connection_data.move_as_ok(); + send_closure(actor_id, &ConnectionCreator::ping_proxy_buffered_socket_fd, ip_address, + std::move(connection_data.buffered_socket_fd), std::move(transport_type), std::move(debug_str), std::move(promise)); }); CHECK(proxy.use_proxy()); @@ -389,12 +390,12 @@ void ConnectionCreator::ping_proxy_resolved(int32 proxy_id, IPAddress ip_address } } -void ConnectionCreator::ping_proxy_socket_fd(IPAddress ip_address, SocketFd socket_fd, - mtproto::TransportType transport_type, string debug_str, - Promise promise) { +void ConnectionCreator::ping_proxy_buffered_socket_fd(IPAddress ip_address, BufferedFd buffered_socket_fd, + mtproto::TransportType transport_type, string debug_str, + Promise promise) { auto token = next_token(); auto raw_connection = - mtproto::RawConnection::create(ip_address, std::move(socket_fd), std::move(transport_type), nullptr); + mtproto::RawConnection::create(ip_address, std::move(buffered_socket_fd), std::move(transport_type), nullptr); children_[token] = { false, create_ping_actor(debug_str, std::move(raw_connection), nullptr, PromiseCreator::lambda([promise = std::move(promise)]( @@ -651,8 +652,9 @@ void ConnectionCreator::request_raw_connection_by_ip(IPAddress ip_address, mtpro if (r_connection_data.is_error()) { return promise.set_error(Status::Error(400, r_connection_data.error().public_message())); } - auto raw_connection = - mtproto::RawConnection::create(ip_address, r_connection_data.move_as_ok().socket_fd, transport_type, nullptr); + auto connection_data = r_connection_data.move_as_ok(); + auto raw_connection = mtproto::RawConnection::create(ip_address, std::move(connection_data.buffered_socket_fd), + transport_type, nullptr); raw_connection->extra().extra = network_generation; promise.set_value(std::move(raw_connection)); }); @@ -754,19 +756,19 @@ ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd , use_connection_token_(use_connection_token) , was_connected_(was_connected) { } - void set_result(Result result) final { - if (result.is_error()) { + void set_result(Result> r_buffered_socket_fd) final { + if (r_buffered_socket_fd.is_error()) { if (use_connection_token_) { connection_token_ = mtproto::ConnectionManager::ConnectionToken(); } if (was_connected_ && stats_callback_) { stats_callback_->on_error(); } - promise_.set_error(Status::Error(400, result.error().public_message())); + promise_.set_error(Status::Error(400, r_buffered_socket_fd.error().public_message())); } else { ConnectionData data; data.ip_address = ip_address_; - data.socket_fd = result.move_as_ok(); + data.buffered_socket_fd = r_buffered_socket_fd.move_as_ok(); data.connection_token = std::move(connection_token_); data.stats_callback = std::move(stats_callback_); promise_.set_value(std::move(data)); @@ -785,7 +787,7 @@ ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd mtproto::ConnectionManager::ConnectionToken connection_token_; IPAddress ip_address_; unique_ptr stats_callback_; - bool use_connection_token_; + bool use_connection_token_{false}; bool was_connected_{false}; }; VLOG(connections) << "Start " @@ -814,7 +816,7 @@ ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd ConnectionData data; data.ip_address = ip_address; - data.socket_fd = std::move(socket_fd); + data.buffered_socket_fd = BufferedFd(std::move(socket_fd)); data.stats_callback = std::move(stats_callback); promise.set_result(std::move(data)); return {}; @@ -991,7 +993,7 @@ void ConnectionCreator::client_create_raw_connection(Result r_co auto connection_data = r_connection_data.move_as_ok(); auto raw_connection = - mtproto::RawConnection::create(connection_data.ip_address, std::move(connection_data.socket_fd), + mtproto::RawConnection::create(connection_data.ip_address, std::move(connection_data.buffered_socket_fd), std::move(transport_type), std::move(connection_data.stats_callback)); raw_connection->set_connection_token(std::move(connection_data.connection_token)); diff --git a/td/telegram/net/ConnectionCreator.h b/td/telegram/net/ConnectionCreator.h index 5a5f69086..42561347c 100644 --- a/td/telegram/net/ConnectionCreator.h +++ b/td/telegram/net/ConnectionCreator.h @@ -25,6 +25,7 @@ #include "td/actor/PromiseFuture.h" #include "td/actor/SignalSlot.h" +#include "td/utils/BufferedFd.h" #include "td/utils/common.h" #include "td/utils/FloodControlStrict.h" #include "td/utils/logging.h" @@ -81,7 +82,7 @@ class ConnectionCreator final : public NetQueryCallback { struct ConnectionData { IPAddress ip_address; - SocketFd socket_fd; + BufferedFd buffered_socket_fd; mtproto::ConnectionManager::ConnectionToken connection_token; unique_ptr stats_callback; }; @@ -246,8 +247,8 @@ class ConnectionCreator final : public NetQueryCallback { void ping_proxy_resolved(int32 proxy_id, IPAddress ip_address, Promise promise); - void ping_proxy_socket_fd(IPAddress ip_address, SocketFd socket_fd, mtproto::TransportType transport_type, - string debug_str, Promise promise); + void ping_proxy_buffered_socket_fd(IPAddress ip_address, BufferedFd buffered_socket_fd, + mtproto::TransportType transport_type, string debug_str, Promise promise); void on_ping_main_dc_result(uint64 token, Result result); }; diff --git a/td/telegram/net/Session.cpp b/td/telegram/net/Session.cpp index 4b6e4c54a..c7289d869 100644 --- a/td/telegram/net/Session.cpp +++ b/td/telegram/net/Session.cpp @@ -500,9 +500,11 @@ void Session::on_closed(Status status) { raw_connection->close(); if (status.is_error()) { - LOG(WARNING) << "Session closed: " << status << " " << current_info_->connection->get_name(); + LOG(WARNING) << "Session with " << sent_queries_.size() << " pending requests was closed: " << status << " " + << current_info_->connection->get_name(); } else { - LOG(INFO) << "Session closed: " << status << " " << current_info_->connection->get_name(); + LOG(INFO) << "Session with " << sent_queries_.size() << " pending requests was closed: " << status << " " + << current_info_->connection->get_name(); } if (status.is_error() && status.code() == -404) { diff --git a/tdnet/td/net/TransparentProxy.h b/tdnet/td/net/TransparentProxy.h index 6bb40ab4b..deadb6010 100644 --- a/tdnet/td/net/TransparentProxy.h +++ b/tdnet/td/net/TransparentProxy.h @@ -28,7 +28,7 @@ class TransparentProxy : public Actor { Callback &operator=(const Callback &) = delete; virtual ~Callback() = default; - virtual void set_result(Result) = 0; + virtual void set_result(Result> r_buffered_socket_fd) = 0; virtual void on_connected() = 0; }; diff --git a/test/mtproto.cpp b/test/mtproto.cpp index d30b0b2d4..11fc2e724 100644 --- a/test/mtproto.cpp +++ b/test/mtproto.cpp @@ -217,7 +217,7 @@ class TestPingActor final : public Actor { } ping_connection_ = mtproto::PingConnection::create_req_pq( - mtproto::RawConnection::create(ip_address_, r_socket.move_as_ok(), + mtproto::RawConnection::create(ip_address_, BufferedFd(r_socket.move_as_ok()), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr), 3); @@ -339,7 +339,7 @@ class HandshakeTestActor final : public Actor { } raw_connection_ = mtproto::RawConnection::create( - ip_address, r_socket.move_as_ok(), + ip_address, BufferedFd(r_socket.move_as_ok()), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr); } if (!wait_for_handshake_ && !handshake_) { @@ -438,22 +438,22 @@ RegisterTest mtproto_handshake("Mtproto_handshake"); class Socks5TestActor final : public Actor { public: void start_up() final { - auto promise = PromiseCreator::lambda([actor_id = actor_id(this)](Result res) { + auto promise = PromiseCreator::lambda([actor_id = actor_id(this)](Result> res) { send_closure(actor_id, &Socks5TestActor::on_result, std::move(res), false); }); class Callback final : public TransparentProxy::Callback { public: - explicit Callback(Promise promise) : promise_(std::move(promise)) { + explicit Callback(Promise> promise) : promise_(std::move(promise)) { } - void set_result(Result result) final { + void set_result(Result> result) final { promise_.set_result(std::move(result)); } void on_connected() final { } private: - Promise promise_; + Promise> promise_; }; IPAddress socks5_ip; @@ -470,7 +470,7 @@ class Socks5TestActor final : public Actor { } private: - void on_result(Result res, bool dummy) { + void on_result(Result> res, bool dummy) { res.ensure(); Scheduler::instance()->finish(); } @@ -545,7 +545,7 @@ class FastPingTestActor final : public Actor { } auto raw_connection = mtproto::RawConnection::create( - ip_address, r_socket.move_as_ok(), + ip_address, BufferedFd(r_socket.move_as_ok()), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr); auto handshake = make_unique(get_default_dc_id(), 60 * 100 /*temp*/); create_actor( @@ -676,7 +676,7 @@ TEST(Mtproto, TlsTransport) { void start_up() final { class Callback final : public TransparentProxy::Callback { public: - void set_result(Result result) final { + void set_result(Result> result) final { if (result.is_ok()) { LOG(ERROR) << "Unexpectedly succeeded to connect to MTProto proxy"; } else if (result.error().message() != "Response hash mismatch") {