Merge commit 'fbc7e5a7f845d6c8df12b11f5b9a5c9ffc6eea0e'

This commit is contained in:
Andrea Cavalli 2021-03-28 12:57:26 +02:00
commit 0c653e2526
38 changed files with 875 additions and 363 deletions

4
.gitattributes vendored
View File

@ -5,6 +5,7 @@
*.h text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.c text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.tl text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.mm text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.txt text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.sh text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent eol=lf
*.php text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
@ -18,6 +19,7 @@
*.java text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.py text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.js text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.patch text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.swift text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.pbxproj text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
*.cs text whitespace=blank-at-eol,space-before-tab,blank-at-eof,tab-in-indent
@ -32,7 +34,5 @@
sqlite/sqlite/* linguist-vendored
*.tlo binary
*.pfx binary
*.png binary

View File

@ -197,9 +197,9 @@ if (IOS_PLATFORM STREQUAL "OS")
elseif (IOS_PLATFORM STREQUAL "SIMULATOR")
set (IOS_ARCH "i386;x86_64")
elseif (IOS_PLATFORM STREQUAL "WATCHOS")
set (IOS_ARCH "armv7k")
set (IOS_ARCH "armv7k;arm64_32")
elseif (IOS_PLATFORM STREQUAL "WATCHSIMULATOR")
set (IOS_ARCH "i386")
set (IOS_ARCH "i386;x86_64")
elseif (IOS_PLATFORM STREQUAL "TVOS")
set (IOS_ARCH "arm64")
elseif (IOS_PLATFORM STREQUAL "TVSIMULATOR")

View File

@ -31,6 +31,7 @@ prevent_in_source_build()
option(TD_ENABLE_JNI "Use \"ON\" to enable JNI-compatible TDLib API.")
option(TD_ENABLE_DOTNET "Use \"ON\" to enable generation of C++/CLI or C++/CX TDLib API bindings.")
option(TD_EXPERIMENTAL_WATCH_OS "Use \"ON\" to enable watch os support.")
if (TD_ENABLE_DOTNET AND (CMAKE_VERSION VERSION_LESS "3.1.0"))
message(FATAL_ERROR "CMake 3.1.0 or higher is required. You are running version ${CMAKE_VERSION}.")

View File

@ -172,6 +172,7 @@ class TdExample {
auto it = handlers_.find(response.request_id);
if (it != handlers_.end()) {
it->second(std::move(response.object));
handlers_.erase(it);
}
}

View File

@ -0,0 +1,16 @@
diff --git a/Makefile b/Makefile
index 695be54..bce31b9 100644
--- a/Makefile
+++ b/Makefile
@@ -56,9 +56,10 @@ CFLAGS-appletvos.arm64=-fembed-bitcode
PYTHON_CONFIGURE-tvOS=ac_cv_func_sigaltstack=no
# watchOS targets
-TARGETS-watchOS=watchsimulator.i386 watchos.armv7k
+TARGETS-watchOS=watchsimulator.i386 watchsimulator.x86_64 watchos.armv7k watchos.arm64_32
CFLAGS-watchOS=-mwatchos-version-min=4.0
CFLAGS-watchos.armv7k=-fembed-bitcode
+CFLAGS-watchos.arm64_32=-fembed-bitcode
PYTHON_CONFIGURE-watchOS=ac_cv_func_sigaltstack=no
# override machine types for arm64

View File

@ -3,10 +3,12 @@
git clone https://github.com/pybee/Python-Apple-support
cd Python-Apple-support
git checkout 60b990128d5f1f04c336ff66594574515ab56604
git apply ../Python-Apple-support.patch
cd ..
#TODO: change openssl version
platforms="macOS iOS watchOS tvOS"
#platforms="watchOS"
for platform in $platforms;
do
echo $platform

View File

@ -6,6 +6,7 @@ mkdir -p build
cd build
platforms="macOS iOS watchOS tvOS"
#platforms="watchOS"
for platform in $platforms;
do
echo "Platform = ${platform} Simulator = ${simulator}"
@ -45,8 +46,10 @@ do
else
ios_platform="OS"
fi
watchos=""
if [[ $platform = "watchOS" ]]; then
ios_platform="WATCH${ios_platform}"
watchos="-DTD_EXPERIMENTAL_WATCH_OS=ON"
fi
if [[ $platform = "tvOS" ]]; then
ios_platform="TV${ios_platform}"
@ -56,7 +59,7 @@ do
mkdir -p $build
mkdir -p $install
cd $build
cmake $td_path $options -DIOS_PLATFORM=${ios_platform} -DCMAKE_TOOLCHAIN_FILE=${td_path}/CMake/iOS.cmake -DCMAKE_INSTALL_PREFIX=../${install}
cmake $td_path $options $watchos -DIOS_PLATFORM=${ios_platform} -DCMAKE_TOOLCHAIN_FILE=${td_path}/CMake/iOS.cmake -DCMAKE_INSTALL_PREFIX=../${install}
make -j3 install || exit
cd ..
done

View File

@ -2152,6 +2152,7 @@ groupCallJoinResponseStream = GroupCallJoinResponse;
//@participant Identifier of the group call participant
//@source User's synchronization source
//@bio The participant user's bio or the participant chat's description
//@is_current_user True, if the participant is the current user
//@is_speaking True, if the participant is speaking as set by setGroupCallParticipantIsSpeaking
//@is_hand_raised True, if the participant hand is raised
//@can_be_muted_for_all_users True, if the current user can mute the participant for all other group call participants
@ -2163,7 +2164,7 @@ groupCallJoinResponseStream = GroupCallJoinResponse;
//@can_unmute_self True, if the participant is muted for all users, but can unmute themself
//@volume_level Participant's volume level; 1-20000 in hundreds of percents
//@order User's order in the group call participant list. Orders must be compared lexicographically. The bigger is order, the higher is user in the list. If order is empty, the user must be removed from the participant list
groupCallParticipant participant:MessageSender source:int32 bio:string is_speaking:Bool is_hand_raised:Bool can_be_muted_for_all_users:Bool can_be_unmuted_for_all_users:Bool can_be_muted_for_current_user:Bool can_be_unmuted_for_current_user:Bool is_muted_for_all_users:Bool is_muted_for_current_user:Bool can_unmute_self:Bool volume_level:int32 order:string = GroupCallParticipant;
groupCallParticipant participant:MessageSender source:int32 bio:string is_current_user:Bool is_speaking:Bool is_hand_raised:Bool can_be_muted_for_all_users:Bool can_be_unmuted_for_all_users:Bool can_be_muted_for_current_user:Bool can_be_unmuted_for_current_user:Bool is_muted_for_all_users:Bool is_muted_for_current_user:Bool can_unmute_self:Bool volume_level:int32 order:string = GroupCallParticipant;
//@class CallProblem @description Describes the exact type of a problem with a call
@ -4232,7 +4233,7 @@ viewMessages chat_id:int53 message_thread_id:int53 message_ids:vector<int53> for
//@description Informs TDLib that the message content has been opened (e.g., the user has opened a photo, video, document, location or venue, or has listened to an audio file or voice note message). An updateMessageContentOpened update will be generated if something has changed @chat_id Chat identifier of the message @message_id Identifier of the message with the opened content
openMessageContent chat_id:int53 message_id:int53 = Ok;
//@description Returns information about an action to be done when the current user clicks an HTTP link. This method can be used to automatically authorize the current user on a website @link The HTTP link
//@description Returns information about an action to be done when the current user clicks an HTTP link. This method can be used to automatically authorize the current user on a website. Don't use this method for links from secret chats if link preview is disabled in secret chats @link The HTTP link
getExternalLinkInfo link:string = LoginUrlInfo;
//@description Returns an HTTP URL which can be used to automatically authorize the current user on a website after clicking an HTTP link. Use the method getExternalLinkInfo to find whether a prior user confirmation is needed

View File

@ -15,5 +15,5 @@ endif()
add_executable(${PROJECT_NAME} ${SOURCES})
if (NOT WIN32)
target_link_libraries(${PROJECT_NAME} m)
target_link_libraries(${PROJECT_NAME} PRIVATE m)
endif()

View File

@ -55,8 +55,8 @@ void HandshakeActor::return_connection(Status status) {
CHECK(!raw_connection_promise_);
return;
}
if (status.is_error() && !raw_connection->debug_str_.empty()) {
status = Status::Error(status.code(), PSLICE() << status.message() << " : " << raw_connection->debug_str_);
if (status.is_error() && !raw_connection->extra().debug_str.empty()) {
status = Status::Error(status.code(), PSLICE() << status.message() << " : " << raw_connection->extra().debug_str);
}
Scheduler::unsubscribe(raw_connection->get_poll_info().get_pollable_fd_ref());
if (raw_connection_promise_) {

View File

@ -85,7 +85,7 @@ ActorOwn<> create_ping_actor(string debug, unique_ptr<RawConnection> raw_connect
raw_connection->close();
promise_.set_error(std::move(status));
} else {
raw_connection->rtt_ = ping_connection_->rtt();
raw_connection->extra().rtt = ping_connection_->rtt();
if (raw_connection->stats_callback()) {
raw_connection->stats_callback()->on_pong();
}

View File

@ -7,156 +7,455 @@
#include "td/mtproto/RawConnection.h"
#include "td/mtproto/AuthKey.h"
#include "td/mtproto/IStreamTransport.h"
#include "td/mtproto/ProxySecret.h"
#include "td/mtproto/Transport.h"
#if TD_EXPERIMENTAL_WATCH_OS
#include "td/net/DarwinHttp.h"
#endif
#include "td/utils/BufferedFd.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/MpscPollableQueue.h"
#include "td/utils/port/EventFd.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/StorerBase.h"
#include <map>
#include <memory>
#include <utility>
namespace td {
namespace mtproto {
void RawConnection::send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key,
uint64 quick_ack_token) {
PacketInfo info;
info.version = 2;
info.no_crypto_flag = false;
info.salt = salt;
info.session_id = session_id;
info.use_random_padding = transport_->use_random_padding();
auto packet = BufferWriter{Transport::write(storer, auth_key, &info), transport_->max_prepend_size(),
transport_->max_append_size()};
Transport::write(storer, auth_key, &info, packet.as_slice());
bool use_quick_ack = false;
if (quick_ack_token != 0 && transport_->support_quick_ack()) {
auto tmp = quick_ack_to_token_.emplace(info.message_ack, quick_ack_token);
if (tmp.second) {
use_quick_ack = true;
} else {
LOG(ERROR) << "Quick ack " << info.message_ack << " collision";
}
class RawConnectionDefault : public RawConnection {
public:
RawConnectionDefault(SocketFd socket_fd, TransportType transport_type, unique_ptr<StatsCallback> stats_callback)
: socket_fd_(std::move(socket_fd))
, transport_(create_transport(transport_type))
, stats_callback_(std::move(stats_callback)) {
transport_->init(&socket_fd_.input_buffer(), &socket_fd_.output_buffer());
}
transport_->write(std::move(packet), use_quick_ack);
}
uint64 RawConnection::send_no_crypto(const Storer &storer) {
PacketInfo info;
info.no_crypto_flag = true;
auto packet = BufferWriter{Transport::write(storer, AuthKey(), &info), transport_->max_prepend_size(),
transport_->max_append_size()};
Transport::write(storer, AuthKey(), &info, packet.as_slice());
LOG(INFO) << "Send handshake packet: " << format::as_hex_dump<4>(packet.as_slice());
transport_->write(std::move(packet), false);
return info.message_id;
}
Status RawConnection::flush_read(const AuthKey &auth_key, Callback &callback) {
auto r = socket_fd_.flush_read();
if (r.is_ok()) {
if (stats_callback_) {
stats_callback_->on_read(r.ok());
}
callback.on_read(r.ok());
void set_connection_token(StateManager::ConnectionToken connection_token) override {
connection_token_ = std::move(connection_token);
}
while (transport_->can_read()) {
BufferSlice packet;
uint32 quick_ack = 0;
TRY_RESULT(wait_size, transport_->read_next(&packet, &quick_ack));
if (!is_aligned_pointer<4>(packet.as_slice().ubegin())) {
BufferSlice new_packet(packet.size());
new_packet.as_slice().copy_from(packet.as_slice());
packet = std::move(new_packet);
}
LOG_CHECK(is_aligned_pointer<4>(packet.as_slice().ubegin()))
<< packet.as_slice().ubegin() << ' ' << packet.size() << ' ' << wait_size;
if (wait_size != 0) {
constexpr size_t MAX_PACKET_SIZE = (1 << 22) + 1024;
if (wait_size > MAX_PACKET_SIZE) {
return Status::Error(PSLICE() << "Expected packet size is too big: " << wait_size);
}
break;
}
if (quick_ack != 0) {
TRY_STATUS(on_quick_ack(quick_ack, callback));
continue;
}
bool can_send() const override {
return transport_->can_write();
}
TransportType get_transport_type() const override {
return transport_->get_type();
}
void send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key,
uint64 quick_ack_token) override {
PacketInfo info;
info.version = 2;
info.no_crypto_flag = false;
info.salt = salt;
info.session_id = session_id;
info.use_random_padding = transport_->use_random_padding();
TRY_RESULT(read_result, Transport::read(packet.as_slice(), auth_key, &info));
switch (read_result.type()) {
case Transport::ReadResult::Quickack: {
TRY_STATUS(on_quick_ack(read_result.quick_ack(), callback));
break;
auto packet = BufferWriter{Transport::write(storer, auth_key, &info), transport_->max_prepend_size(),
transport_->max_append_size()};
Transport::write(storer, auth_key, &info, packet.as_slice());
bool use_quick_ack = false;
if (quick_ack_token != 0 && transport_->support_quick_ack()) {
auto tmp = quick_ack_to_token_.emplace(info.message_ack, quick_ack_token);
if (tmp.second) {
use_quick_ack = true;
} else {
LOG(ERROR) << "Quick ack " << info.message_ack << " collision";
}
case Transport::ReadResult::Error: {
TRY_STATUS(on_read_mtproto_error(read_result.error()));
break;
}
transport_->write(std::move(packet), use_quick_ack);
}
uint64 send_no_crypto(const Storer &storer) override {
PacketInfo info;
info.no_crypto_flag = true;
auto packet = BufferWriter{Transport::write(storer, AuthKey(), &info), transport_->max_prepend_size(),
transport_->max_append_size()};
Transport::write(storer, AuthKey(), &info, packet.as_slice());
LOG(INFO) << "Send handshake packet: " << format::as_hex_dump<4>(packet.as_slice());
transport_->write(std::move(packet), false);
return info.message_id;
}
PollableFdInfo &get_poll_info() override {
return socket_fd_.get_poll_info();
}
StatsCallback *stats_callback() override {
return stats_callback_.get();
}
// NB: After first returned error, all subsequent calls will return error too.
Status flush(const AuthKey &auth_key, Callback &callback) override {
auto status = do_flush(auth_key, callback);
if (status.is_error()) {
if (stats_callback_ && status.code() != 2) {
stats_callback_->on_error();
}
case Transport::ReadResult::Packet: {
// If a packet was successfully decrypted, then it is ok to assume that the connection is alive
if (!auth_key.empty()) {
if (stats_callback_) {
stats_callback_->on_pong();
}
has_error_ = true;
}
return status;
}
bool has_error() const override {
return has_error_;
}
void close() override {
transport_.reset();
socket_fd_.close();
}
PublicFields &extra() override {
return extra_;
}
const PublicFields &extra() const override {
return extra_;
}
private:
PublicFields extra_;
BufferedFd<SocketFd> socket_fd_;
unique_ptr<IStreamTransport> transport_;
std::map<uint32, uint64> quick_ack_to_token_;
bool has_error_{false};
unique_ptr<StatsCallback> stats_callback_;
StateManager::ConnectionToken connection_token_;
Status flush_read(const AuthKey &auth_key, Callback &callback) {
auto r = socket_fd_.flush_read();
if (r.is_ok()) {
if (stats_callback_) {
stats_callback_->on_read(r.ok());
}
callback.on_read(r.ok());
}
while (transport_->can_read()) {
BufferSlice packet;
uint32 quick_ack = 0;
TRY_RESULT(wait_size, transport_->read_next(&packet, &quick_ack));
if (!is_aligned_pointer<4>(packet.as_slice().ubegin())) {
BufferSlice new_packet(packet.size());
new_packet.as_slice().copy_from(packet.as_slice());
packet = std::move(new_packet);
}
LOG_CHECK(is_aligned_pointer<4>(packet.as_slice().ubegin()))
<< packet.as_slice().ubegin() << ' ' << packet.size() << ' ' << wait_size;
if (wait_size != 0) {
constexpr size_t MAX_PACKET_SIZE = (1 << 22) + 1024;
if (wait_size > MAX_PACKET_SIZE) {
return Status::Error(PSLICE() << "Expected packet size is too big: " << wait_size);
}
TRY_STATUS(callback.on_raw_packet(info, packet.from_slice(read_result.packet())));
break;
}
case Transport::ReadResult::Nop:
break;
default:
UNREACHABLE();
if (quick_ack != 0) {
TRY_STATUS(on_quick_ack(quick_ack, callback));
continue;
}
PacketInfo info;
info.version = 2;
TRY_RESULT(read_result, Transport::read(packet.as_slice(), auth_key, &info));
switch (read_result.type()) {
case Transport::ReadResult::Quickack: {
TRY_STATUS(on_quick_ack(read_result.quick_ack(), callback));
break;
}
case Transport::ReadResult::Error: {
TRY_STATUS(on_read_mtproto_error(read_result.error()));
break;
}
case Transport::ReadResult::Packet: {
// If a packet was successfully decrypted, then it is ok to assume that the connection is alive
if (!auth_key.empty()) {
if (stats_callback_) {
stats_callback_->on_pong();
}
}
TRY_STATUS(callback.on_raw_packet(info, packet.from_slice(read_result.packet())));
break;
}
case Transport::ReadResult::Nop:
break;
default:
UNREACHABLE();
}
}
}
TRY_STATUS(std::move(r));
return Status::OK();
}
Status RawConnection::on_read_mtproto_error(int32 error_code) {
if (error_code == -429) {
if (stats_callback_) {
stats_callback_->on_mtproto_error();
}
return Status::Error(500, PSLICE() << "MTProto error: " << error_code);
}
if (error_code == -404) {
return Status::Error(-404, PSLICE() << "MTProto error: " << error_code);
}
return Status::Error(PSLICE() << "MTProto error: " << error_code);
}
Status RawConnection::on_quick_ack(uint32 quick_ack, Callback &callback) {
auto it = quick_ack_to_token_.find(quick_ack);
if (it == quick_ack_to_token_.end()) {
LOG(WARNING) << Status::Error(PSLICE() << "Unknown quick_ack " << quick_ack);
TRY_STATUS(std::move(r));
return Status::OK();
// TODO: return Status::Error(PSLICE() << "Unknown quick_ack " << quick_ack);
}
auto token = it->second;
quick_ack_to_token_.erase(it);
callback.on_quick_ack(token).ignore();
return Status::OK();
}
Status RawConnection::flush_write() {
TRY_RESULT(size, socket_fd_.flush_write());
if (size > 0 && stats_callback_) {
stats_callback_->on_write(size);
Status on_read_mtproto_error(int32 error_code) {
if (error_code == -429) {
if (stats_callback_) {
stats_callback_->on_mtproto_error();
}
return Status::Error(500, PSLICE() << "MTProto error: " << error_code);
}
if (error_code == -404) {
return Status::Error(-404, PSLICE() << "MTProto error: " << error_code);
}
return Status::Error(PSLICE() << "MTProto error: " << error_code);
}
return Status::OK();
Status on_quick_ack(uint32 quick_ack, Callback &callback) {
auto it = quick_ack_to_token_.find(quick_ack);
if (it == quick_ack_to_token_.end()) {
LOG(WARNING) << Status::Error(PSLICE() << "Unknown quick_ack " << quick_ack);
return Status::OK();
// TODO: return Status::Error(PSLICE() << "Unknown quick_ack " << quick_ack);
}
auto token = it->second;
quick_ack_to_token_.erase(it);
callback.on_quick_ack(token).ignore();
return Status::OK();
}
Status flush_write() {
TRY_RESULT(size, socket_fd_.flush_write());
if (size > 0 && stats_callback_) {
stats_callback_->on_write(size);
}
return Status::OK();
}
Status do_flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT {
if (has_error_) {
return Status::Error("Connection has already failed");
}
sync_with_poll(socket_fd_);
// read/write
// EINVAL may be returned in linux kernel < 2.6.28. And on some new kernels too.
// just close connection and hope that read or write will not return this error too.
TRY_STATUS(socket_fd_.get_pending_error());
TRY_STATUS(flush_read(auth_key, callback));
TRY_STATUS(callback.before_write());
TRY_STATUS(flush_write());
if (can_close_local(socket_fd_)) {
return Status::Error("Connection closed");
}
return Status::OK();
}
};
#if TD_EXPERIMENTAL_WATCH_OS
class RawConnectionHttp : public RawConnection {
public:
RawConnectionHttp(IPAddress ip_address, unique_ptr<StatsCallback> stats_callback)
: ip_address_(std::move(ip_address)), stats_callback_(std::move(stats_callback)) {
answers_ = std::make_shared<MpscPollableQueue<Result<BufferSlice>>>();
answers_->init();
}
void set_connection_token(StateManager::ConnectionToken connection_token) override {
connection_token_ = std::move(connection_token);
}
bool can_send() const override {
return mode_ == Send;
}
TransportType get_transport_type() const override {
return mtproto::TransportType{mtproto::TransportType::Http, 0, mtproto::ProxySecret()};
}
void send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key,
uint64 quick_ack_token) override {
PacketInfo info;
info.version = 2;
info.no_crypto_flag = false;
info.salt = salt;
info.session_id = session_id;
info.use_random_padding = false;
auto packet = BufferWriter{Transport::write(storer, auth_key, &info), 0, 0};
Transport::write(storer, auth_key, &info, packet.as_slice());
send_packet(packet.as_buffer_slice());
}
uint64 send_no_crypto(const Storer &storer) override {
PacketInfo info;
info.no_crypto_flag = true;
auto packet = BufferWriter{Transport::write(storer, AuthKey(), &info), 0, 0};
Transport::write(storer, AuthKey(), &info, packet.as_slice());
LOG(INFO) << "Send handshake packet: " << format::as_hex_dump<4>(packet.as_slice());
send_packet(packet.as_buffer_slice());
return info.message_id;
}
PollableFdInfo &get_poll_info() override {
return answers_->reader_get_event_fd().get_poll_info();
}
StatsCallback *stats_callback() override {
return stats_callback_.get();
}
// NB: After first returned error, all subsequent calls will return error too.
Status flush(const AuthKey &auth_key, Callback &callback) override {
auto status = do_flush(auth_key, callback);
if (status.is_error()) {
if (stats_callback_ && status.code() != 2) {
stats_callback_->on_error();
}
has_error_ = true;
}
return status;
}
bool has_error() const override {
return has_error_;
}
void close() override {
}
PublicFields &extra() override {
return extra_;
}
const PublicFields &extra() const override {
return extra_;
}
private:
PublicFields extra_;
IPAddress ip_address_;
bool has_error_{false};
EventFd event_fd_;
enum Mode { Send, Receive } mode_{Send};
unique_ptr<StatsCallback> stats_callback_;
StateManager::ConnectionToken connection_token_;
std::shared_ptr<MpscPollableQueue<Result<BufferSlice>>> answers_;
std::vector<BufferSlice> to_send_;
void send_packet(BufferSlice packet) {
CHECK(mode_ == Send);
mode_ = Receive;
to_send_.push_back(std::move(packet));
}
Status flush_read(const AuthKey &auth_key, Callback &callback) {
while (true) {
auto packets_n = answers_->reader_wait_nonblock();
if (packets_n == 0) {
break;
}
for (int i = 0; i < packets_n; i++) {
TRY_RESULT(packet, answers_->reader_get_unsafe());
if (stats_callback_) {
stats_callback_->on_read(packet.size());
}
callback.on_read(packet.size());
CHECK(mode_ == Receive);
mode_ = Send;
PacketInfo info;
info.version = 2;
TRY_RESULT(read_result, Transport::read(packet.as_slice(), auth_key, &info));
switch (read_result.type()) {
case Transport::ReadResult::Quickack: {
break;
}
case Transport::ReadResult::Error: {
TRY_STATUS(on_read_mtproto_error(read_result.error()));
break;
}
case Transport::ReadResult::Packet: {
// If a packet was successfully decrypted, then it is ok to assume that the connection is alive
if (!auth_key.empty()) {
if (stats_callback_) {
stats_callback_->on_pong();
}
}
TRY_STATUS(callback.on_raw_packet(info, packet.from_slice(read_result.packet())));
break;
}
case Transport::ReadResult::Nop:
break;
default:
UNREACHABLE();
}
}
}
return Status::OK();
}
Status on_read_mtproto_error(int32 error_code) {
if (error_code == -429) {
if (stats_callback_) {
stats_callback_->on_mtproto_error();
}
return Status::Error(500, PSLICE() << "MTProto error: " << error_code);
}
if (error_code == -404) {
return Status::Error(-404, PSLICE() << "MTProto error: " << error_code);
}
return Status::Error(PSLICE() << "MTProto error: " << error_code);
}
Status flush_write() {
for (auto &packet : to_send_) {
TRY_STATUS(do_send(packet.as_slice()));
if (packet.size() > 0 && stats_callback_) {
stats_callback_->on_write(packet.size());
}
}
to_send_.clear();
return Status::OK();
}
Status do_send(Slice data) {
DarwinHttp::post(PSLICE() << "http://" << ip_address_.get_ip_str() << ":" << ip_address_.get_port() << "/api", data,
[answers = answers_](auto res) { answers->writer_put(std::move(res)); });
return Status::OK();
}
Status do_flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT {
if (has_error_) {
return Status::Error("Connection has already failed");
}
TRY_STATUS(flush_read(auth_key, callback));
TRY_STATUS(callback.before_write());
TRY_STATUS(flush_write());
return Status::OK();
}
};
#endif
unique_ptr<RawConnection> RawConnection::create(IPAddress ip_address, SocketFd socket_fd, TransportType transport_type,
unique_ptr<StatsCallback> stats_callback) {
#if TD_EXPERIMENTAL_WATCH_OS
return td::make_unique<RawConnectionHttp>(ip_address, std::move(stats_callback));
#else
return td::make_unique<RawConnectionDefault>(std::move(socket_fd), transport_type, std::move(stats_callback));
#endif
}
} // namespace mtproto

View File

@ -6,22 +6,19 @@
//
#pragma once
#include "td/mtproto/IStreamTransport.h"
#include "td/telegram/StateManager.h"
#include "td/mtproto/PacketInfo.h"
#include "td/mtproto/TransportType.h"
#include "td/utils/buffer.h"
#include "td/utils/BufferedFd.h"
#include "td/utils/common.h"
#include "td/utils/port/detail/PollableFd.h"
#include "td/utils/port/IPAddress.h"
#include "td/utils/port/SocketFd.h"
#include "td/utils/Status.h"
#include "td/utils/StorerBase.h"
#include "td/telegram/StateManager.h"
#include <map>
namespace td {
namespace mtproto {
@ -40,33 +37,23 @@ class RawConnection {
virtual void on_mtproto_error() = 0;
};
RawConnection() = default;
RawConnection(SocketFd socket_fd, TransportType transport_type, unique_ptr<StatsCallback> stats_callback)
: socket_fd_(std::move(socket_fd))
, transport_(create_transport(transport_type))
, stats_callback_(std::move(stats_callback)) {
transport_->init(&socket_fd_.input_buffer(), &socket_fd_.output_buffer());
}
RawConnection(const RawConnection &) = delete;
RawConnection &operator=(const RawConnection &) = delete;
virtual ~RawConnection() = default;
void set_connection_token(StateManager::ConnectionToken connection_token) {
connection_token_ = std::move(connection_token);
}
static unique_ptr<RawConnection> create(IPAddress ip_address, SocketFd socket_fd, TransportType transport_type,
unique_ptr<StatsCallback> stats_callback);
bool can_send() const {
return transport_->can_write();
}
TransportType get_transport_type() const {
return transport_->get_type();
}
void send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key,
uint64 quick_ack_token = 0);
uint64 send_no_crypto(const Storer &storer);
virtual void set_connection_token(StateManager::ConnectionToken connection_token) = 0;
PollableFdInfo &get_poll_info() {
return socket_fd_.get_poll_info();
}
StatsCallback *stats_callback() {
return stats_callback_.get();
}
virtual bool can_send() const = 0;
virtual TransportType get_transport_type() const = 0;
virtual void send_crypto(const Storer &storer, int64 session_id, int64 salt, const AuthKey &auth_key,
uint64 quick_ack_token = 0) = 0;
virtual uint64 send_no_crypto(const Storer &storer) = 0;
virtual PollableFdInfo &get_poll_info() = 0;
virtual StatsCallback *stats_callback() = 0;
class Callback {
public:
@ -86,65 +73,19 @@ class RawConnection {
};
// NB: After first returned error, all subsequent calls will return error too.
Status flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT {
auto status = do_flush(auth_key, callback);
if (status.is_error()) {
if (stats_callback_ && status.code() != 2) {
stats_callback_->on_error();
}
has_error_ = true;
}
return status;
}
virtual Status flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT = 0;
virtual bool has_error() const = 0;
bool has_error() const {
return has_error_;
}
virtual void close() = 0;
void close() {
transport_.reset();
socket_fd_.close();
}
struct PublicFields {
uint32 extra{0};
string debug_str;
double rtt{0};
};
uint32 extra_{0};
string debug_str_;
double rtt_{0};
private:
BufferedFd<SocketFd> socket_fd_;
unique_ptr<IStreamTransport> transport_;
std::map<uint32, uint64> quick_ack_to_token_;
bool has_error_{false};
unique_ptr<StatsCallback> stats_callback_;
StateManager::ConnectionToken connection_token_;
Status flush_read(const AuthKey &auth_key, Callback &callback);
Status flush_write();
Status on_quick_ack(uint32 quick_ack, Callback &callback);
Status on_read_mtproto_error(int32 error_code);
Status do_flush(const AuthKey &auth_key, Callback &callback) TD_WARN_UNUSED_RESULT {
if (has_error_) {
return Status::Error("Connection has already failed");
}
sync_with_poll(socket_fd_);
// read/write
// EINVAL may be returned in linux kernel < 2.6.28. And on some new kernels too.
// just close connection and hope that read or write will not return this error too.
TRY_STATUS(socket_fd_.get_pending_error());
TRY_STATUS(flush_read(auth_key, callback));
TRY_STATUS(callback.before_write());
TRY_STATUS(flush_write());
if (can_close_local(socket_fd_)) {
return Status::Error("Connection closed");
}
return Status::OK();
}
virtual PublicFields &extra() = 0;
virtual const PublicFields &extra() const = 0;
};
} // namespace mtproto

View File

@ -132,7 +132,7 @@ class SessionConnection
bool is_main_ = false;
int rtt() const {
return max(2, static_cast<int>(raw_connection_->rtt_ * 1.5 + 1));
return max(2, static_cast<int>(raw_connection_->extra().rtt * 1.5 + 1));
}
int32 read_disconnect_delay() const {

View File

@ -92,11 +92,11 @@ class Transport {
MutableSlice dest = MutableSlice());
static std::pair<uint32, UInt128> calc_message_key2(const AuthKey &auth_key, int X, Slice to_encrypt);
private:
template <class HeaderT>
static std::pair<uint32, UInt128> calc_message_ack_and_key(const HeaderT &head, size_t data_size);
template <class HeaderT>
static size_t calc_crypto_size(size_t data_size);

View File

@ -2293,7 +2293,7 @@ class InviteToChannelQuery : public Td::ResultHandler {
auto ptr = result_ptr.move_as_ok();
LOG(INFO) << "Receive result for InviteToChannelQuery: " << to_string(ptr);
td->contacts_manager_->invalidate_channel_full(channel_id_, false, false);
td->contacts_manager_->invalidate_channel_full(channel_id_, false);
td->updates_manager_->on_get_updates(std::move(ptr), std::move(promise_));
}
@ -2328,7 +2328,7 @@ class EditChannelAdminQuery : public Td::ResultHandler {
auto ptr = result_ptr.move_as_ok();
LOG(INFO) << "Receive result for EditChannelAdminQuery: " << to_string(ptr);
td->contacts_manager_->invalidate_channel_full(channel_id_, false, false);
td->contacts_manager_->invalidate_channel_full(channel_id_, false);
td->updates_manager_->on_get_updates(std::move(ptr), std::move(promise_));
}
@ -2363,7 +2363,7 @@ class EditChannelBannedQuery : public Td::ResultHandler {
auto ptr = result_ptr.move_as_ok();
LOG(INFO) << "Receive result for EditChannelBannedQuery: " << to_string(ptr);
td->contacts_manager_->invalidate_channel_full(channel_id_, false, false);
td->contacts_manager_->invalidate_channel_full(channel_id_, false);
td->updates_manager_->on_get_updates(std::move(ptr), std::move(promise_));
}
@ -2469,7 +2469,7 @@ class EditChannelCreatorQuery : public Td::ResultHandler {
auto ptr = result_ptr.move_as_ok();
LOG(INFO) << "Receive result for EditChannelCreatorQuery: " << to_string(ptr);
td->contacts_manager_->invalidate_channel_full(channel_id_, false, false);
td->contacts_manager_->invalidate_channel_full(channel_id_, false);
td->updates_manager_->on_get_updates(std::move(ptr), std::move(promise_));
}
@ -3523,7 +3523,7 @@ void ContactsManager::on_channel_unban_timeout(ChannelId channel_id) {
LOG(INFO) << "Update " << channel_id << " status";
c->is_status_changed = true;
invalidate_channel_full(channel_id, false, !c->is_slow_mode_enabled);
invalidate_channel_full(channel_id, !c->is_slow_mode_enabled);
update_channel(c, channel_id); // always call, because in case of failure we need to reactivate timeout
}
@ -7140,8 +7140,7 @@ Status ContactsManager::can_manage_dialog_invite_links(DialogId dialog_id, bool
if (!c->is_active) {
return Status::Error(3, "Chat is deactivated");
}
auto status = get_chat_status(c);
bool have_rights = creator_only ? status.is_creator() : status.is_administrator() && status.can_invite_users();
bool have_rights = creator_only ? c->status.is_creator() : c->status.can_manage_invite_links();
if (!have_rights) {
return Status::Error(3, "Not enough rights to manage chat invite link");
}
@ -7152,8 +7151,7 @@ Status ContactsManager::can_manage_dialog_invite_links(DialogId dialog_id, bool
if (c == nullptr) {
return Status::Error(3, "Chat info not found");
}
auto status = get_channel_status(c);
bool have_rights = creator_only ? status.is_creator() : status.is_administrator() && status.can_invite_users();
bool have_rights = creator_only ? c->status.is_creator() : c->status.can_manage_invite_links();
if (!have_rights) {
return Status::Error(3, "Not enough rights to manage chat invite link");
}
@ -9433,11 +9431,16 @@ void ContactsManager::on_load_chat_full_from_database(ChatId chat_id, string val
Chat *c = get_chat(chat_id);
CHECK(c != nullptr);
// ignore ChatFull without invite link
if (c->is_active && c->status.is_administrator() && c->status.can_invite_users() &&
!chat_full->invite_link.is_valid()) {
chats_full_.erase(chat_id);
return;
bool need_invite_link = c->is_active && c->status.can_manage_invite_links();
bool have_invite_link = chat_full->invite_link.is_valid();
if (need_invite_link != have_invite_link) {
if (need_invite_link) {
// ignore ChatFull without invite link
chats_full_.erase(chat_id);
return;
} else {
chat_full->invite_link = DialogInviteLink();
}
}
if (td_->file_manager_->get_file_view(c->photo.small_file_id).get_unique_file_id() !=
@ -9532,10 +9535,16 @@ void ContactsManager::on_load_channel_full_from_database(ChannelId channel_id, s
Channel *c = get_channel(channel_id);
CHECK(c != nullptr);
// ignore ChannelFull without invite link
if (c->status.is_administrator() && c->status.can_invite_users() && !channel_full->invite_link.is_valid()) {
channels_full_.erase(channel_id);
return;
bool need_invite_link = c->status.can_manage_invite_links();
bool have_invite_link = channel_full->invite_link.is_valid();
if (need_invite_link != have_invite_link) {
if (need_invite_link) {
// ignore ChannelFull without invite link
channels_full_.erase(channel_id);
return;
} else {
channel_full->invite_link = DialogInviteLink();
}
}
if (td_->file_manager_->get_file_view(c->photo.small_file_id).get_unique_file_id() !=
@ -11525,7 +11534,7 @@ bool ContactsManager::on_get_channel_error(ChannelId channel_id, const Status &s
remove_dialog_access_by_invite_link(DialogId(channel_id));
}
invalidate_channel_full(channel_id, false, !c->is_slow_mode_enabled);
invalidate_channel_full(channel_id, !c->is_slow_mode_enabled);
LOG_IF(ERROR, have_input_peer_channel(c, channel_id, AccessRights::Read))
<< "Have read access to channel after receiving CHANNEL_PRIVATE. Channel state: "
<< oneline(to_string(get_supergroup_object(channel_id, c)))
@ -11775,7 +11784,7 @@ void ContactsManager::speculative_add_channel_participants(ChannelId channel_id,
bool by_me) {
if (by_me) {
// Currently ignore all changes made by the current user, because they may be already counted
invalidate_channel_full(channel_id, false, false); // just in case
invalidate_channel_full(channel_id, false); // just in case
return;
}
@ -11926,16 +11935,12 @@ void ContactsManager::drop_channel_photos(ChannelId channel_id, bool is_empty, b
}
}
void ContactsManager::invalidate_channel_full(ChannelId channel_id, bool need_drop_invite_link,
bool need_drop_slow_mode_delay) {
void ContactsManager::invalidate_channel_full(ChannelId channel_id, bool need_drop_slow_mode_delay) {
LOG(INFO) << "Invalidate supergroup full for " << channel_id;
// drop channel full cache
auto channel_full = get_channel_full_force(channel_id, "invalidate_channel_full");
if (channel_full != nullptr) {
channel_full->expires_at = 0.0;
if (need_drop_invite_link) {
on_update_channel_full_invite_link(channel_full, nullptr);
}
if (need_drop_slow_mode_delay && channel_full->slow_mode_delay != 0) {
channel_full->slow_mode_delay = 0;
channel_full->slow_mode_next_send_date = 0;
@ -11944,9 +11949,6 @@ void ContactsManager::invalidate_channel_full(ChannelId channel_id, bool need_dr
}
update_channel_full(channel_full, channel_id);
}
if (need_drop_invite_link) {
remove_dialog_access_by_invite_link(DialogId(channel_id));
}
}
void ContactsManager::on_update_chat_full_photo(ChatFull *chat_full, ChatId chat_id, Photo photo) {
@ -12591,6 +12593,7 @@ void ContactsManager::on_update_chat_status(Chat *c, ChatId chat_id, DialogParti
if (c->status != status) {
LOG(INFO) << "Update " << chat_id << " status from " << c->status << " to " << status;
bool need_reload_group_call = c->status.can_manage_calls() != status.can_manage_calls();
bool need_drop_invite_link = c->status.can_manage_invite_links() && !status.can_manage_invite_links();
c->status = status;
@ -12601,6 +12604,12 @@ void ContactsManager::on_update_chat_status(Chat *c, ChatId chat_id, DialogParti
c->pinned_message_version = -1;
drop_chat_full(chat_id);
} else if (need_drop_invite_link) {
ChatFull *chat_full = get_chat_full_force(chat_id, "on_update_chat_status");
if (chat_full != nullptr) {
on_update_chat_full_invite_link(chat_full, nullptr);
update_chat_full(chat_full, chat_id);
}
}
if (need_reload_group_call) {
send_closure_later(G()->messages_manager(), &MessagesManager::on_update_dialog_group_call_rights,
@ -12927,10 +12936,17 @@ void ContactsManager::on_channel_status_changed(Channel *c, ChannelId channel_id
const DialogParticipantStatus &new_status) {
CHECK(c->is_update_supergroup_sent);
bool need_drop_invite_link = old_status.is_administrator() != new_status.is_administrator() ||
old_status.is_member() != new_status.is_member();
bool need_reload_group_call = old_status.can_manage_calls() != new_status.can_manage_calls();
invalidate_channel_full(channel_id, need_drop_invite_link, !c->is_slow_mode_enabled);
if (old_status.can_manage_invite_links() && !new_status.can_manage_invite_links()) {
auto channel_full = get_channel_full_force(channel_id, "on_channel_status_changed");
if (channel_full != nullptr) {
on_update_channel_full_invite_link(channel_full, nullptr);
invalidate_channel_full(channel_id, !c->is_slow_mode_enabled);
update_channel_full(channel_full, channel_id);
}
} else {
invalidate_channel_full(channel_id, !c->is_slow_mode_enabled);
}
if (old_status.is_creator() != new_status.is_creator()) {
for (size_t i = 0; i < 2; i++) {
@ -12942,6 +12958,10 @@ void ContactsManager::on_channel_status_changed(Channel *c, ChannelId channel_id
reload_dialog_administrators(DialogId(channel_id), 0, Auto());
remove_dialog_suggested_action(SuggestedAction{SuggestedAction::Type::ConvertToGigagroup, DialogId(channel_id)});
}
if (old_status.is_member() != new_status.is_member() || new_status.is_banned()) {
remove_dialog_access_by_invite_link(DialogId(channel_id));
}
if (need_reload_group_call) {
send_closure_later(G()->messages_manager(), &MessagesManager::on_update_dialog_group_call_rights,
DialogId(channel_id));
@ -12991,7 +13011,7 @@ void ContactsManager::on_channel_username_changed(Channel *c, ChannelId channel_
const string &new_username) {
if (old_username.empty() || new_username.empty()) {
// moving channel from private to public can change availability of chat members
invalidate_channel_full(channel_id, true, !c->is_slow_mode_enabled);
invalidate_channel_full(channel_id, !c->is_slow_mode_enabled);
}
}
@ -13841,8 +13861,7 @@ bool ContactsManager::is_chat_full_outdated(const ChatFull *chat_full, const Cha
}
}
if (c->is_active && c->status.is_administrator() && c->status.can_invite_users() &&
!chat_full->invite_link.is_valid()) {
if (c->is_active && c->status.can_manage_invite_links() && !chat_full->invite_link.is_valid()) {
LOG(INFO) << "Have outdated invite link in " << chat_id;
return true;
}
@ -15254,7 +15273,7 @@ void ContactsManager::on_chat_update(telegram_api::channel &channel, const char
c->is_gigagroup = is_gigagroup;
c->is_changed = true;
invalidate_channel_full(channel_id, false, !c->is_slow_mode_enabled);
invalidate_channel_full(channel_id, !c->is_slow_mode_enabled);
}
if (c->is_verified != is_verified || c->sign_messages != sign_messages) {
c->is_verified = is_verified;
@ -15345,7 +15364,7 @@ void ContactsManager::on_chat_update(telegram_api::channel &channel, const char
}
if (need_invalidate_channel_full) {
invalidate_channel_full(channel_id, false, !c->is_slow_mode_enabled);
invalidate_channel_full(channel_id, !c->is_slow_mode_enabled);
}
bool has_active_group_call = (channel.flags_ & CHANNEL_FLAG_HAS_ACTIVE_GROUP_CALL) != 0;
@ -15454,7 +15473,7 @@ void ContactsManager::on_chat_update(telegram_api::channelForbidden &channel, co
}
}
if (need_invalidate_channel_full) {
invalidate_channel_full(channel_id, false, !c->is_slow_mode_enabled);
invalidate_channel_full(channel_id, !c->is_slow_mode_enabled);
}
}

View File

@ -237,7 +237,7 @@ class ContactsManager : public Actor {
void speculative_delete_channel_participant(ChannelId channel_id, UserId deleted_user_id, bool by_me);
void invalidate_channel_full(ChannelId channel_id, bool need_drop_invite_link, bool need_drop_slow_mode_delay);
void invalidate_channel_full(ChannelId channel_id, bool need_drop_slow_mode_delay);
bool on_get_channel_error(ChannelId channel_id, const Status &status, const string &source);

View File

@ -235,6 +235,11 @@ class DialogParticipantStatus {
return (flags_ & CAN_INVITE_USERS_ADMIN) != 0 || (flags_ & CAN_INVITE_USERS_BANNED) != 0;
}
bool can_manage_invite_links() const {
// invite links can be managed, only if administrator was explicitly granted the right
return (flags_ & CAN_INVITE_USERS_ADMIN) != 0;
}
bool can_restrict_members() const {
return (flags_ & CAN_RESTRICT_MEMBERS) != 0;
}

View File

@ -655,6 +655,7 @@ struct GroupCallManager::GroupCall {
DialogId as_dialog_id;
int32 version = -1;
int32 leave_version = -1;
int32 title_version = -1;
int32 mute_version = -1;
int32 stream_dc_id_version = -1;
@ -1445,10 +1446,12 @@ void GroupCallManager::on_update_group_call_participants(
continue;
}
if (participant.joined_date == 0) {
diff--;
if (version > group_call->leave_version) {
diff--;
}
remove_recent_group_call_speaker(input_group_call_id, participant.dialog_id);
} else {
if (participant.is_just_joined) {
if (participant.is_just_joined && version >= group_call->leave_version) {
diff++;
}
on_participant_speaking_in_group_call(input_group_call_id, participant);
@ -1521,7 +1524,7 @@ void GroupCallManager::on_update_group_call_participants(
}
auto dialog_id = participant.dialog_id;
if (dialog_id.get_type() != DialogType::User && participant.joined_date != 0) {
td_->messages_manager_->force_create_dialog(dialog_id, "on_update_group_call_participants 2");
td_->messages_manager_->force_create_dialog(dialog_id, "on_update_group_call_participants 2", true);
}
if (GroupCallParticipant::is_versioned_update(group_call_participant)) {
@ -1718,7 +1721,7 @@ void GroupCallManager::process_group_call_participants(
continue;
}
if (participant.dialog_id.get_type() != DialogType::User) {
td_->messages_manager_->force_create_dialog(participant.dialog_id, "process_group_call_participants");
td_->messages_manager_->force_create_dialog(participant.dialog_id, "process_group_call_participants", true);
}
on_participant_speaking_in_group_call(input_group_call_id, participant);
@ -1752,7 +1755,7 @@ void GroupCallManager::process_group_call_participants(
continue;
}
if (participant.dialog_id.get_type() != DialogType::User) {
td_->messages_manager_->force_create_dialog(participant.dialog_id, "process_group_call_participants");
td_->messages_manager_->force_create_dialog(participant.dialog_id, "process_group_call_participants", true);
}
if (is_load) {
@ -3326,6 +3329,7 @@ void GroupCallManager::try_clear_group_call_participants(InputGroupCallId input_
group_call->loaded_all_participants = false;
send_update_group_call(group_call, "try_clear_group_call_participants");
}
group_call->leave_version = group_call->version;
group_call->version = -1;
for (auto &participant : participants->participants) {
@ -3627,7 +3631,7 @@ void GroupCallManager::on_user_speaking_in_group_call(GroupCallId group_call_id,
for (size_t i = 0; i <= recent_speakers->users.size(); i++) {
if (i == recent_speakers->users.size() || recent_speakers->users[i].second <= date) {
if (dialog_id.get_type() != DialogType::User) {
td_->messages_manager_->force_create_dialog(dialog_id, "on_user_speaking_in_group_call");
td_->messages_manager_->force_create_dialog(dialog_id, "on_user_speaking_in_group_call", true);
}
recent_speakers->users.insert(recent_speakers->users.begin() + i, {dialog_id, date});
break;

View File

@ -10,6 +10,7 @@
#include "td/telegram/DialogParticipant.h"
#include "td/telegram/GroupCallId.h"
#include "td/telegram/GroupCallParticipant.h"
#include "td/telegram/GroupCallParticipantOrder.h"
#include "td/telegram/InputGroupCallId.h"
#include "td/telegram/td_api.h"
#include "td/telegram/telegram_api.h"

View File

@ -12,6 +12,8 @@
#include "td/utils/logging.h"
#include <limits>
namespace td {
GroupCallParticipant::GroupCallParticipant(const tl_object_ptr<telegram_api::groupCallParticipant> &participant,
@ -245,7 +247,7 @@ td_api::object_ptr<td_api::groupCallParticipant> GroupCallParticipant::get_group
}
return td_api::make_object<td_api::groupCallParticipant>(
td->messages_manager_->get_message_sender_object(dialog_id), audio_source, about, is_speaking,
td->messages_manager_->get_message_sender_object(dialog_id), audio_source, about, is_self, is_speaking,
get_is_hand_raised(), can_be_muted_for_all_users, can_be_unmuted_for_all_users, can_be_muted_only_for_self,
can_be_unmuted_only_for_self, get_is_muted_for_all_users(), get_is_muted_locally(), get_is_muted_by_themselves(),
get_volume_level(), order.get_group_call_participant_order_object());
@ -253,7 +255,8 @@ td_api::object_ptr<td_api::groupCallParticipant> GroupCallParticipant::get_group
bool operator==(const GroupCallParticipant &lhs, const GroupCallParticipant &rhs) {
return lhs.dialog_id == rhs.dialog_id && lhs.audio_source == rhs.audio_source && lhs.about == rhs.about &&
lhs.is_speaking == rhs.is_speaking && lhs.get_is_hand_raised() == rhs.get_is_hand_raised() &&
lhs.is_self == rhs.is_self && lhs.is_speaking == rhs.is_speaking &&
lhs.get_is_hand_raised() == rhs.get_is_hand_raised() &&
lhs.can_be_muted_for_all_users == rhs.can_be_muted_for_all_users &&
lhs.can_be_unmuted_for_all_users == rhs.can_be_unmuted_for_all_users &&
lhs.can_be_muted_only_for_self == rhs.can_be_muted_only_for_self &&

View File

@ -1126,7 +1126,8 @@ vector<Slice> find_mentions(Slice str) {
if (mention.size() >= 5) {
return false;
}
return get_valid_short_usernames().count(mention) == 0;
auto lowered_mention = to_lower(mention);
return get_valid_short_usernames().count(lowered_mention) == 0;
});
return mentions;
}

View File

@ -7394,7 +7394,7 @@ void MessagesManager::on_user_dialog_action(DialogId dialog_id, MessageId top_th
const Dialog *d = get_dialog_force(dialog_id);
if (d != nullptr && d->active_group_call_id.is_valid()) {
auto group_call_id = td_->group_call_manager_->get_group_call_id(d->active_group_call_id, dialog_id);
td_->group_call_manager_->on_user_speaking_in_group_call(group_call_id, dialog_id, date);
td_->group_call_manager_->on_user_speaking_in_group_call(group_call_id, typing_dialog_id, date);
}
return;
}
@ -30838,7 +30838,7 @@ void MessagesManager::send_dialog_action(DialogId dialog_id, MessageId top_threa
tl_object_ptr<telegram_api::InputPeer> input_peer;
if (action == DialogAction::get_speaking_action()) {
input_peer = td_->messages_manager_->get_input_peer(dialog_id, AccessRights::Read);
input_peer = get_input_peer(dialog_id, AccessRights::Read);
if (input_peer == nullptr) {
return promise.set_error(Status::Error(400, "Have no access to the chat"));
}
@ -30855,7 +30855,7 @@ void MessagesManager::send_dialog_action(DialogId dialog_id, MessageId top_threa
return promise.set_value(Unit());
}
input_peer = td_->messages_manager_->get_input_peer(dialog_id, AccessRights::Write);
input_peer = get_input_peer(dialog_id, AccessRights::Write);
}
if (dialog_id.get_type() == DialogType::SecretChat) {
@ -34090,7 +34090,8 @@ void MessagesManager::force_create_dialog(DialogId dialog_id, const char *source
d = add_dialog(dialog_id);
update_dialog_pos(d, "force_create_dialog");
if (dialog_id.get_type() == DialogType::SecretChat && !d->notification_settings.is_synchronized) {
if (dialog_id.get_type() == DialogType::SecretChat && !d->notification_settings.is_synchronized &&
td_->contacts_manager_->get_secret_chat_state(dialog_id.get_secret_chat_id()) != SecretChatState::Closed) {
// secret chat is being created
// let's copy notification settings from main chat if available
VLOG(notifications) << "Create new secret " << dialog_id << " from " << source;
@ -35312,7 +35313,7 @@ unique_ptr<MessagesManager::Dialog> MessagesManager::parse_dialog(DialogId dialo
Dependencies dependencies;
add_dialog_dependencies(dependencies, dialog_id);
if (d->default_join_group_call_as_dialog_id != dialog_id) {
add_dialog_and_dependencies(dependencies, d->default_join_group_call_as_dialog_id);
add_message_sender_dependencies(dependencies, d->default_join_group_call_as_dialog_id);
}
if (d->messages != nullptr) {
add_message_dependencies(dependencies, dialog_id, d->messages.get());

View File

@ -570,12 +570,12 @@ class TestProxyRequest : public RequestOnceActor {
set_timeout_in(timeout_);
promise_ = std::move(promise);
IPAddress ip;
auto status = ip.init_host_port(proxy_.server(), proxy_.port());
IPAddress ip_address;
auto status = ip_address.init_host_port(proxy_.server(), proxy_.port());
if (status.is_error()) {
return promise_.set_error(Status::Error(400, status.public_message()));
}
auto r_socket_fd = SocketFd::open(ip);
auto r_socket_fd = SocketFd::open(ip_address);
if (r_socket_fd.is_error()) {
return promise_.set_error(Status::Error(400, r_socket_fd.error().public_message()));
}
@ -594,9 +594,9 @@ class TestProxyRequest : public RequestOnceActor {
send_closure(actor_id, &TestProxyRequest::on_connection_data, std::move(r_data));
});
child_ =
ConnectionCreator::prepare_connection(r_socket_fd.move_as_ok(), proxy_, mtproto_ip_address, get_transport(),
"Test", "TestPingDC2", nullptr, {}, false, std::move(connection_promise));
child_ = ConnectionCreator::prepare_connection(ip_address, r_socket_fd.move_as_ok(), proxy_, mtproto_ip_address,
get_transport(), "Test", "TestPingDC2", nullptr, {}, false,
std::move(connection_promise));
}
void on_connection_data(Result<ConnectionCreator::ConnectionData> r_data) {
@ -617,7 +617,8 @@ class TestProxyRequest : public RequestOnceActor {
};
auto handshake = make_unique<mtproto::AuthKeyHandshake>(dc_id_, 3600);
auto data = r_data.move_as_ok();
auto raw_connection = make_unique<mtproto::RawConnection>(std::move(data.socket_fd), get_transport(), nullptr);
auto raw_connection =
mtproto::RawConnection::create(data.ip_address, std::move(data.socket_fd), get_transport(), nullptr);
child_ = create_actor<mtproto::HandshakeActor>(
"HandshakeActor", std::move(handshake), std::move(raw_connection), make_unique<HandshakeContext>(), 10.0,
PromiseCreator::lambda([actor_id = actor_id(this)](Result<unique_ptr<mtproto::RawConnection>> raw_connection) {

View File

@ -2213,7 +2213,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelTooLong>
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannel> update, Promise<Unit> &&promise) {
td_->contacts_manager_->invalidate_channel_full(ChannelId(update->channel_id_), false, false);
td_->contacts_manager_->invalidate_channel_full(ChannelId(update->channel_id_), false);
promise.set_value(Unit());
}

View File

@ -323,18 +323,20 @@ void ConnectionCreator::ping_proxy(int32 proxy_id, Promise<double> promise) {
continue;
}
auto r_socket_fd = SocketFd::open(info.option->get_ip_address());
auto ip_address = info.option->get_ip_address();
auto r_socket_fd = SocketFd::open(ip_address);
if (r_socket_fd.is_error()) {
LOG(DEBUG) << "Failed to open socket: " << r_socket_fd.error();
on_ping_main_dc_result(token, r_socket_fd.move_as_error());
continue;
}
ping_proxy_socket_fd(
r_socket_fd.move_as_ok(), r_transport_type.move_as_ok(), PSTRING() << info.option->get_ip_address(),
PromiseCreator::lambda([actor_id = actor_id(this), token](Result<double> result) {
send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token, std::move(result));
}));
ping_proxy_socket_fd(std::move(ip_address), r_socket_fd.move_as_ok(), r_transport_type.move_as_ok(),
PSTRING() << info.option->get_ip_address(),
PromiseCreator::lambda([actor_id = actor_id(this), token](Result<double> result) {
send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token,
std::move(result));
}));
}
return;
}
@ -371,28 +373,31 @@ void ConnectionCreator::ping_proxy_resolved(int32 proxy_id, IPAddress ip_address
auto socket_fd = r_socket_fd.move_as_ok();
auto connection_promise = PromiseCreator::lambda(
[promise = std::move(promise), actor_id = actor_id(this), transport_type = extra.transport_type,
[ip_address, promise = std::move(promise), actor_id = actor_id(this), transport_type = extra.transport_type,
debug_str = std::move(extra.debug_str)](Result<ConnectionData> r_connection_data) mutable {
if (r_connection_data.is_error()) {
return promise.set_error(Status::Error(400, r_connection_data.error().public_message()));
}
send_closure(actor_id, &ConnectionCreator::ping_proxy_socket_fd, r_connection_data.move_as_ok().socket_fd,
std::move(transport_type), std::move(debug_str), std::move(promise));
send_closure(actor_id, &ConnectionCreator::ping_proxy_socket_fd, ip_address,
r_connection_data.move_as_ok().socket_fd, std::move(transport_type), std::move(debug_str),
std::move(promise));
});
CHECK(proxy.use_proxy());
auto token = next_token();
auto ref =
prepare_connection(std::move(socket_fd), proxy, extra.mtproto_ip_address, extra.transport_type, "Ping",
extra.debug_str, nullptr, create_reference(token), false, std::move(connection_promise));
auto ref = prepare_connection(extra.ip_address, std::move(socket_fd), proxy, extra.mtproto_ip_address,
extra.transport_type, "Ping", extra.debug_str, nullptr, create_reference(token), false,
std::move(connection_promise));
if (!ref.empty()) {
children_[token] = {false, std::move(ref)};
}
}
void ConnectionCreator::ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type,
string debug_str, Promise<double> promise) {
void ConnectionCreator::ping_proxy_socket_fd(IPAddress ip_address, SocketFd socket_fd,
mtproto::TransportType transport_type, string debug_str,
Promise<double> promise) {
auto token = next_token();
auto raw_connection = make_unique<mtproto::RawConnection>(std::move(socket_fd), std::move(transport_type), nullptr);
auto raw_connection =
mtproto::RawConnection::create(ip_address, std::move(socket_fd), std::move(transport_type), nullptr);
children_[token] = {
false, create_ping_actor(std::move(debug_str), std::move(raw_connection), nullptr,
PromiseCreator::lambda([promise = std::move(promise)](
@ -400,7 +405,7 @@ void ConnectionCreator::ping_proxy_socket_fd(SocketFd socket_fd, mtproto::Transp
if (result.is_error()) {
return promise.set_error(Status::Error(400, result.error().public_message()));
}
auto ping_time = result.ok()->rtt_;
auto ping_time = result.ok()->extra().rtt;
promise.set_value(std::move(ping_time));
}),
create_reference(token))};
@ -643,20 +648,20 @@ void ConnectionCreator::request_raw_connection_by_ip(IPAddress ip_address, mtpro
}
auto socket_fd = r_socket_fd.move_as_ok();
auto connection_promise = PromiseCreator::lambda(
[promise = std::move(promise), actor_id = actor_id(this), transport_type,
network_generation = network_generation_](Result<ConnectionData> r_connection_data) mutable {
if (r_connection_data.is_error()) {
return promise.set_error(Status::Error(400, r_connection_data.error().public_message()));
}
auto raw_connection =
make_unique<mtproto::RawConnection>(r_connection_data.move_as_ok().socket_fd, transport_type, nullptr);
raw_connection->extra_ = network_generation;
promise.set_value(std::move(raw_connection));
});
auto connection_promise = PromiseCreator::lambda([promise = std::move(promise), actor_id = actor_id(this),
transport_type, network_generation = network_generation_,
ip_address](Result<ConnectionData> r_connection_data) mutable {
if (r_connection_data.is_error()) {
return promise.set_error(Status::Error(400, r_connection_data.error().public_message()));
}
auto raw_connection =
mtproto::RawConnection::create(ip_address, r_connection_data.move_as_ok().socket_fd, transport_type, nullptr);
raw_connection->extra().extra = network_generation;
promise.set_value(std::move(raw_connection));
});
auto token = next_token();
auto ref = prepare_connection(std::move(socket_fd), Proxy(), IPAddress(), transport_type, "Raw",
auto ref = prepare_connection(ip_address, std::move(socket_fd), Proxy(), IPAddress(), transport_type, "Raw",
PSTRING() << "to IP address " << ip_address, nullptr, create_reference(token), false,
std::move(connection_promise));
if (!ref.empty()) {
@ -699,6 +704,9 @@ Result<SocketFd> ConnectionCreator::find_connection(const Proxy &proxy, const IP
bool prefer_ipv6 =
G()->shared_config().get_option_boolean("prefer_ipv6") || (proxy.use_proxy() && proxy_ip_address.is_ipv6());
bool only_http = proxy.use_http_caching_proxy();
#if TD_EXPERIMENTAL_WATCH_OS
only_http = true;
#endif
TRY_RESULT(info, dc_options_set_.find_connection(
dc_id, allow_media_only, proxy.use_proxy() && proxy.use_socks5_proxy(), prefer_ipv6, only_http));
extra.stat = info.stat;
@ -718,29 +726,33 @@ Result<SocketFd> ConnectionCreator::find_connection(const Proxy &proxy, const IP
if (proxy.use_proxy()) {
extra.mtproto_ip_address = info.option->get_ip_address();
extra.ip_address = proxy_ip_address;
extra.debug_str = PSTRING() << (proxy.use_socks5_proxy() ? "Socks5" : (only_http ? "HTTP_ONLY" : "HTTP_TCP")) << ' '
<< proxy_ip_address << " --> " << extra.mtproto_ip_address << extra.debug_str;
VLOG(connections) << "Create: " << extra.debug_str;
return SocketFd::open(proxy_ip_address);
} else {
extra.ip_address = info.option->get_ip_address();
extra.debug_str = PSTRING() << info.option->get_ip_address() << extra.debug_str;
VLOG(connections) << "Create: " << extra.debug_str;
return SocketFd::open(info.option->get_ip_address());
}
VLOG(connections) << "Create: " << extra.debug_str;
return SocketFd::open(extra.ip_address);
}
ActorOwn<> ConnectionCreator::prepare_connection(
SocketFd socket_fd, const Proxy &proxy, const IPAddress &mtproto_ip_address, mtproto::TransportType transport_type,
Slice actor_name_prefix, Slice debug_str, unique_ptr<mtproto::RawConnection::StatsCallback> stats_callback,
ActorShared<> parent, bool use_connection_token, Promise<ConnectionData> promise) {
ActorOwn<> ConnectionCreator::prepare_connection(IPAddress ip_address, SocketFd socket_fd, const Proxy &proxy,
const IPAddress &mtproto_ip_address,
mtproto::TransportType transport_type, Slice actor_name_prefix,
Slice debug_str,
unique_ptr<mtproto::RawConnection::StatsCallback> stats_callback,
ActorShared<> parent, bool use_connection_token,
Promise<ConnectionData> promise) {
if (proxy.use_socks5_proxy() || proxy.use_http_tcp_proxy() || transport_type.secret.emulate_tls()) {
VLOG(connections) << "Create new transparent proxy connection " << debug_str;
class Callback : public TransparentProxy::Callback {
public:
explicit Callback(Promise<ConnectionData> promise,
explicit Callback(Promise<ConnectionData> promise, IPAddress ip_address,
unique_ptr<mtproto::RawConnection::StatsCallback> stats_callback, bool use_connection_token,
bool was_connected)
: promise_(std::move(promise))
, ip_address_(std::move(ip_address))
, stats_callback_(std::move(stats_callback))
, use_connection_token_(use_connection_token)
, was_connected_(was_connected) {
@ -756,6 +768,7 @@ ActorOwn<> ConnectionCreator::prepare_connection(
promise_.set_error(Status::Error(400, result.error().public_message()));
} else {
ConnectionData data;
data.ip_address = ip_address_;
data.socket_fd = result.move_as_ok();
data.connection_token = std::move(connection_token_);
data.stats_callback = std::move(stats_callback_);
@ -772,6 +785,7 @@ ActorOwn<> ConnectionCreator::prepare_connection(
private:
Promise<ConnectionData> promise_;
StateManager::ConnectionToken connection_token_;
IPAddress ip_address_;
unique_ptr<mtproto::RawConnection::StatsCallback> stats_callback_;
bool use_connection_token_;
bool was_connected_{false};
@ -779,8 +793,8 @@ ActorOwn<> ConnectionCreator::prepare_connection(
VLOG(connections) << "Start "
<< (proxy.use_socks5_proxy() ? "Socks5" : (proxy.use_http_tcp_proxy() ? "HTTP" : "TLS")) << ": "
<< debug_str;
auto callback = make_unique<Callback>(std::move(promise), std::move(stats_callback), use_connection_token,
!proxy.use_socks5_proxy());
auto callback = make_unique<Callback>(std::move(promise), ip_address, std::move(stats_callback),
use_connection_token, !proxy.use_socks5_proxy());
if (proxy.use_socks5_proxy()) {
return ActorOwn<>(create_actor<Socks5>(PSLICE() << actor_name_prefix << "Socks5", std::move(socket_fd),
mtproto_ip_address, proxy.user().str(), proxy.password().str(),
@ -801,6 +815,7 @@ ActorOwn<> ConnectionCreator::prepare_connection(
VLOG(connections) << "Create new direct connection " << debug_str;
ConnectionData data;
data.ip_address = ip_address;
data.socket_fd = std::move(socket_fd);
data.stats_callback = std::move(stats_callback);
promise.set_result(std::move(data));
@ -933,9 +948,9 @@ void ConnectionCreator::client_loop(ClientInfo &client) {
td::make_unique<detail::StatsCallback>(client.is_media ? media_net_stats_callback_ : common_net_stats_callback_,
actor_id(this), client.hash, extra.stat);
auto token = next_token();
auto ref = prepare_connection(std::move(socket_fd), proxy, extra.mtproto_ip_address, extra.transport_type, Slice(),
extra.debug_str, std::move(stats_callback), create_reference(token), true,
std::move(promise));
auto ref = prepare_connection(extra.ip_address, std::move(socket_fd), proxy, extra.mtproto_ip_address,
extra.transport_type, Slice(), extra.debug_str, std::move(stats_callback),
create_reference(token), true, std::move(promise));
if (!ref.empty()) {
children_[token] = {true, std::move(ref)};
}
@ -963,7 +978,7 @@ void ConnectionCreator::client_create_raw_connection(Result<ConnectionData> r_co
debug_str](Result<unique_ptr<mtproto::RawConnection>> result) mutable {
if (result.is_ok()) {
VLOG(connections) << "Ready connection (" << (check_mode ? "" : "un") << "checked) " << result.ok().get() << ' '
<< tag("rtt", format::as_time(result.ok()->rtt_)) << ' ' << debug_str;
<< tag("rtt", format::as_time(result.ok()->extra().rtt)) << ' ' << debug_str;
} else {
VLOG(connections) << "Failed connection (" << (check_mode ? "" : "un") << "checked) " << result.error() << ' '
<< debug_str;
@ -977,12 +992,13 @@ void ConnectionCreator::client_create_raw_connection(Result<ConnectionData> r_co
}
auto connection_data = r_connection_data.move_as_ok();
auto raw_connection = make_unique<mtproto::RawConnection>(
std::move(connection_data.socket_fd), std::move(transport_type), std::move(connection_data.stats_callback));
auto raw_connection =
mtproto::RawConnection::create(connection_data.ip_address, std::move(connection_data.socket_fd),
std::move(transport_type), std::move(connection_data.stats_callback));
raw_connection->set_connection_token(std::move(connection_data.connection_token));
raw_connection->extra_ = network_generation;
raw_connection->debug_str_ = debug_str;
raw_connection->extra().extra = network_generation;
raw_connection->extra().debug_str = debug_str;
if (check_mode) {
VLOG(connections) << "Start check: " << debug_str << " " << (auth_data ? "with" : "without") << " auth data";

View File

@ -84,6 +84,7 @@ class ConnectionCreator : public NetQueryCallback {
void ping_proxy(int32 proxy_id, Promise<double> promise);
struct ConnectionData {
IPAddress ip_address;
SocketFd socket_fd;
StateManager::ConnectionToken connection_token;
unique_ptr<mtproto::RawConnection::StatsCallback> stats_callback;
@ -91,8 +92,9 @@ class ConnectionCreator : public NetQueryCallback {
static DcOptions get_default_dc_options(bool is_test);
static ActorOwn<> prepare_connection(SocketFd socket_fd, const Proxy &proxy, const IPAddress &mtproto_ip_address,
mtproto::TransportType transport_type, Slice actor_name_prefix, Slice debug_str,
static ActorOwn<> prepare_connection(IPAddress ip_address, SocketFd socket_fd, const Proxy &proxy,
const IPAddress &mtproto_ip_address, mtproto::TransportType transport_type,
Slice actor_name_prefix, Slice debug_str,
unique_ptr<mtproto::RawConnection::StatsCallback> stats_callback,
ActorShared<> parent, bool use_connection_token,
Promise<ConnectionData> promise);
@ -232,6 +234,7 @@ class ConnectionCreator : public NetQueryCallback {
DcOptionsSet::Stat *stat{nullptr};
mtproto::TransportType transport_type;
string debug_str;
IPAddress ip_address;
IPAddress mtproto_ip_address;
bool check_mode{false};
};
@ -246,8 +249,8 @@ class ConnectionCreator : public NetQueryCallback {
void ping_proxy_resolved(int32 proxy_id, IPAddress ip_address, Promise<double> promise);
void ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type, string debug_str,
Promise<double> promise);
void ping_proxy_socket_fd(IPAddress ip_address, SocketFd socket_fd, mtproto::TransportType transport_type,
string debug_str, Promise<double> promise);
void on_ping_main_dc_result(uint64 token, Result<double> result);
};

View File

@ -110,7 +110,7 @@ class GenAuthKeyActor : public Actor {
auto raw_connection = r_raw_connection.move_as_ok();
VLOG(dc) << "Receive raw connection " << raw_connection.get();
network_generation_ = raw_connection->extra_;
network_generation_ = raw_connection->extra().extra;
child_ = create_actor_on_scheduler<mtproto::HandshakeActor>(
PSLICE() << name_ + "::HandshakeActor", G()->get_slow_net_scheduler_id(), std::move(handshake_),
std::move(raw_connection), std::move(context_), 10, std::move(connection_promise_),
@ -1052,7 +1052,7 @@ void Session::connection_open_finish(ConnectionInfo *info,
auto raw_connection = r_raw_connection.move_as_ok();
VLOG(dc) << "Receive raw connection " << raw_connection.get();
if (raw_connection->extra_ != network_generation_) {
if (raw_connection->extra().extra != network_generation_) {
LOG(WARNING) << "Got RawConnection with old network_generation";
info->state = ConnectionInfo::State::Empty;
yield();
@ -1087,7 +1087,7 @@ void Session::connection_open_finish(ConnectionInfo *info,
mode_name = Slice("HttpLongPoll");
}
}
auto name = PSTRING() << get_name() << "::Connect::" << mode_name << "::" << raw_connection->debug_str_;
auto name = PSTRING() << get_name() << "::Connect::" << mode_name << "::" << raw_connection->extra().debug_str;
LOG(INFO) << "Finished to open connection " << name;
info->connection = make_unique<mtproto::SessionConnection>(mode, std::move(raw_connection), &auth_data_);
if (can_destroy_auth_key()) {

View File

@ -46,6 +46,15 @@ set(TDNET_SOURCE
td/net/Wget.h
)
if (TD_EXPERIMENTAL_WATCH_OS)
set(TDNET_SOURCE
${TDNET_SOURCE}
td/net/DarwinHttp.mm
td/net/DarwinHttp.h
)
set_source_files_properties(td/net/DarwinHttp.mm PROPERTIES COMPILE_FLAGS -fobjc-arc)
endif()
#RULES
#LIBRARIES
@ -66,6 +75,11 @@ if (WIN32)
endif()
endif()
if (TD_EXPERIMENTAL_WATCH_OS)
find_library(FOUNDATION_LIBRARY Foundation REQUIRED)
target_link_libraries(tdnet PRIVATE ${FOUNDATION_LIBRARY})
endif()
install(TARGETS tdnet EXPORT TdTargets
LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}"
ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}"

22
tdnet/td/net/DarwinHttp.h Normal file
View File

@ -0,0 +1,22 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/actor/PromiseFuture.h"
#include "td/utils/buffer.h"
#include "td/utils/Slice.h"
namespace td {
class DarwinHttp {
public:
static void get(CSlice url, Promise<BufferSlice> promise);
static void post(CSlice url, Slice data, Promise<BufferSlice> promise);
};
} // namespace td

View File

@ -0,0 +1,67 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/net/DarwinHttp.h"
#include "td/utils/logging.h"
#import <Foundation/Foundation.h>
namespace td {
namespace {
NSString *to_ns_string(CSlice slice) {
return [NSString stringWithUTF8String:slice.c_str()];
}
NSData *to_ns_data(Slice data) {
return [NSData dataWithBytes:static_cast<const void *>(data.data()) length:data.size()];
}
auto http_get(CSlice url) {
auto nsurl = [NSURL URLWithString:to_ns_string(url)];
auto request = [NSURLRequest requestWithURL:nsurl];
return request;
}
auto http_post(CSlice url, Slice data) {
auto nsurl = [NSURL URLWithString:to_ns_string(url)];
auto request = [NSMutableURLRequest requestWithURL:nsurl];
[request setHTTPMethod:@"POST"];
[request setHTTPBody:to_ns_data(data)];
[request setValue:@"keep-alive" forHTTPHeaderField:@"Connection"];
[request setValue:@"" forHTTPHeaderField:@"Host"];
[request setValue:to_ns_string(PSLICE() << data.size()) forHTTPHeaderField:@"Content-Length"];
[request setValue:@"application/x-www-form-urlencoded" forHTTPHeaderField:@"Content-Type"];
return request;
}
void http_send(NSURLRequest *request, Promise<BufferSlice> promise) {
__block auto callback = std::move(promise);
NSURLSessionDataTask *dataTask =
[NSURLSession.sharedSession
dataTaskWithRequest:request
completionHandler:
^(NSData *data, NSURLResponse *response, NSError *error) {
if (error == nil) {
callback(BufferSlice(Slice((const char *)([data bytes]), [data length])));
} else {
callback(Status::Error(static_cast<int32>([error code]), "HTTP request failed"));
}
}];
[dataTask resume];
}
} // namespace
void DarwinHttp::get(CSlice url, Promise<BufferSlice> promise) {
return http_send(http_get(url), std::move(promise));
}
void DarwinHttp::post(CSlice url, Slice data, Promise<BufferSlice> promise) {
return http_send(http_post(url, data), std::move(promise));
}
} // namespace td

View File

@ -87,7 +87,7 @@ Result<size_t> HttpReader::read_next(HttpQuery *query, bool can_be_slow) {
gzip_flow_ = GzipByteFlow(Gzip::Mode::Decode);
GzipByteFlow::Options options;
options.write_watermark.low = 0;
options.write_watermark.high = max(max_post_size_, static_cast<size_t>(1 << 16));
options.write_watermark.high = max(max_post_size_, MAX_TOTAL_PARAMETERS_LENGTH + 1);
gzip_flow_.set_options(options);
gzip_flow_.set_max_output_size(MAX_CONTENT_SIZE);
*source >> gzip_flow_;
@ -270,7 +270,7 @@ Result<bool> HttpReader::parse_multipart_form_data(bool can_be_slow) {
return Status::Error(431, "Request Header Fields Too Large: total headers size exceeded");
}
if (form_data_read_length_ == 0) {
// there is no headers at all
// there are no headers at all
return Status::Error(400, "Bad Request: headers in multipart/form-data are empty");
}
@ -324,24 +324,58 @@ Result<bool> HttpReader::parse_multipart_form_data(bool can_be_slow) {
break;
}
size_t key_size = key_end - header_value.data();
auto key = header_value.substr(0, key_size);
key = trim(key);
auto key = trim(header_value.substr(0, key_size));
header_value.remove_prefix(key_size + 1);
const char *value_end =
static_cast<const char *>(std::memchr(header_value.data(), ';', header_value.size()));
size_t value_size;
if (value_end == nullptr) {
value_size = header_value.size();
} else {
value_size = value_end - header_value.data();
while (!header_value.empty() && is_space(header_value[0])) {
header_value.remove_prefix(1);
}
auto value = header_value.substr(0, value_size);
value = trim(value);
if (value.size() > 1u && value[0] == '"' && value.back() == '"') {
value = {value.data() + 1, value.size() - 2};
MutableSlice value;
if (!header_value.empty() && header_value[0] == '"') { // quoted-string
char *value_end = header_value.data() + 1;
const char *pos = value_end;
while (true) {
if (pos == header_value.data() + header_value.size()) {
return Status::Error(400, "Bad Request: unclosed quoted string in Content-Disposition header");
}
char c = *pos++;
if (c == '"') {
break;
}
if (c == '\\') {
if (pos == header_value.data() + header_value.size()) {
return Status::Error(400, "Bad Request: wrong escape sequence in Content-Disposition header");
}
c = *pos++;
}
*value_end++ = c;
}
value = header_value.substr(1, value_end - header_value.data() - 1);
header_value.remove_prefix(pos - header_value.data());
while (!header_value.empty() && is_space(header_value[0])) {
header_value.remove_prefix(1);
}
if (!header_value.empty()) {
if (header_value[0] != ';') {
return Status::Error(400, "Bad Request: expected ';' in Content-Disposition header");
}
header_value.remove_prefix(1);
}
} else { // token
auto value_end =
static_cast<const char *>(std::memchr(header_value.data(), ';', header_value.size()));
if (value_end != nullptr) {
auto value_size = static_cast<size_t>(value_end - header_value.data());
value = trim(header_value.substr(0, value_size));
header_value.remove_prefix(value_size + 1);
} else {
value = trim(header_value);
header_value = MutableSlice();
}
}
header_value.remove_prefix(value_size + (header_value.size() > value_size));
if (key == "name") {
field_name_ = value;

View File

@ -6,3 +6,4 @@
#cmakedefine01 TD_HAVE_COROUTINES
#cmakedefine01 TD_HAVE_ABSL
#cmakedefine01 TD_FD_DEBUG
#cmakedefine01 TD_EXPERIMENTAL_WATCH_OS

View File

@ -499,6 +499,9 @@ Status IPAddress::init_sockaddr(sockaddr *addr, socklen_t len) {
Status IPAddress::init_socket_address(const SocketFd &socket_fd) {
is_valid_ = false;
if (socket_fd.empty()) {
return Status::Error("Socket is empty");
}
auto socket = socket_fd.get_native_fd().socket();
socklen_t len = storage_size();
int ret = getsockname(socket, &sockaddr_, &len);
@ -511,6 +514,9 @@ Status IPAddress::init_socket_address(const SocketFd &socket_fd) {
Status IPAddress::init_peer_address(const SocketFd &socket_fd) {
is_valid_ = false;
if (socket_fd.empty()) {
return Status::Error("Socket is empty");
}
auto socket = socket_fd.get_native_fd().socket();
socklen_t len = storage_size();
int ret = getpeername(socket, &sockaddr_, &len);

View File

@ -590,6 +590,10 @@ Result<SocketFd> SocketFd::from_native_fd(NativeFd fd) {
}
Result<SocketFd> SocketFd::open(const IPAddress &address) {
#if TD_EXPERIMENTAL_WATCH_OS
return SocketFd{};
#endif
NativeFd native_fd{socket(address.get_address_family(), SOCK_STREAM, IPPROTO_TCP)};
if (!native_fd) {
return OS_SOCKET_ERROR("Failed to create a socket");

View File

@ -6,6 +6,10 @@
//
#include "data.h"
#if TD_EXPERIMENTAL_WATCH_OS
#include "td/net/DarwinHttp.h"
#endif
#include "td/net/HttpChunkedByteFlow.h"
#include "td/net/HttpHeaderCreator.h"
#include "td/net/HttpQuery.h"
@ -38,6 +42,9 @@
#include <algorithm>
#include <limits>
#include <condition_variable>
#include <mutex>
REGISTER_TESTS(http)
using namespace td;
@ -239,7 +246,6 @@ TEST(Http, gzip_bomb) {
}
TEST(Http, gzip) {
return;
auto gzip_str = gzdecode(base64url_decode(Slice(gzip, gzip_size)).ok()).as_slice().str();
td::ChainBufferWriter input_writer;
@ -462,3 +468,39 @@ TEST(Http, gzip_bomb_with_limit) {
}
ASSERT_TRUE(ok);
}
#if TD_EXPERIMENTAL_WATCH_OS
struct Baton {
std::mutex mutex;
std::condition_variable cond;
bool is_ready{false};
void wait() {
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, [&] { return is_ready; });
}
void post() {
{
std::unique_lock<std::mutex> lock(mutex);
is_ready = true;
}
cond.notify_all();
}
void reset() {
is_ready = false;
}
};
TEST(Http, Darwin) {
Baton baton;
//LOG(ERROR) << "???";
td::DarwinHttp::get("http://example.com", [&](td::BufferSlice data) {
//LOG(ERROR) << data.as_slice();
baton.post();
});
//LOG(ERROR) << "!!!";
baton.wait();
}
#endif

View File

@ -44,8 +44,9 @@ TEST(MessageEntities, mention) {
check_mention("@abcdefghijklmnopqrstuvwxyz123456", {"@abcdefghijklmnopqrstuvwxyz123456"});
check_mention("@abcdefghijklmnopqrstuvwxyz1234567", {});
check_mention("нет@mention", {});
check_mention("@ya @gif @wiki @vid @bing @pic @bold @imdb @coub @like @vote @giff @cap ya cap @y @yar @bingg @bin",
{"@gif", "@wiki", "@vid", "@bing", "@pic", "@bold", "@imdb", "@coub", "@like", "@vote", "@bingg"});
check_mention(
"@ya @gif @wiki @vid @bing @pic @bold @imdb @ImDb @coub @like @vote @giff @cap ya cap @y @yar @bingg @bin",
{"@gif", "@wiki", "@vid", "@bing", "@pic", "@bold", "@imdb", "@ImDb", "@coub", "@like", "@vote", "@bingg"});
};
static void check_bot_command(const td::string &str, const td::vector<td::string> &expected) {

View File

@ -216,9 +216,9 @@ class TestPingActor : public Actor {
}
ping_connection_ = mtproto::PingConnection::create_req_pq(
make_unique<mtproto::RawConnection>(
r_socket.move_as_ok(), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()},
nullptr),
mtproto::RawConnection::create(ip_address_, r_socket.move_as_ok(),
mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()},
nullptr),
3);
Scheduler::subscribe(ping_connection_->get_poll_info().extract_pollable_fd(this));
@ -330,15 +330,16 @@ class HandshakeTestActor : public Actor {
}
void loop() override {
if (!wait_for_raw_connection_ && !raw_connection_) {
auto r_socket = SocketFd::open(get_default_ip_address());
auto ip_address = get_default_ip_address();
auto r_socket = SocketFd::open(ip_address);
if (r_socket.is_error()) {
finish(Status::Error(PSTRING() << "Failed to open socket: " << r_socket.error()));
return stop();
}
raw_connection_ = make_unique<mtproto::RawConnection>(
r_socket.move_as_ok(), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()},
nullptr);
raw_connection_ = mtproto::RawConnection::create(
ip_address, r_socket.move_as_ok(),
mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr);
}
if (!wait_for_handshake_ && !handshake_) {
handshake_ = make_unique<mtproto::AuthKeyHandshake>(dc_id_, 3600);
@ -535,14 +536,16 @@ class FastPingTestActor : public Actor {
void start_up() override {
// Run handshake to create key and salt
auto r_socket = SocketFd::open(get_default_ip_address());
auto ip_address = get_default_ip_address();
auto r_socket = SocketFd::open(ip_address);
if (r_socket.is_error()) {
*result_ = Status::Error(PSTRING() << "Failed to open socket: " << r_socket.error());
return stop();
}
auto raw_connection = make_unique<mtproto::RawConnection>(
r_socket.move_as_ok(), mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr);
auto raw_connection = mtproto::RawConnection::create(
ip_address, r_socket.move_as_ok(),
mtproto::TransportType{mtproto::TransportType::Tcp, 0, mtproto::ProxySecret()}, nullptr);
auto handshake = make_unique<mtproto::AuthKeyHandshake>(get_default_dc_id(), 60 * 100 /*temp*/);
create_actor<mtproto::HandshakeActor>(
"HandshakeActor", std::move(handshake), std::move(raw_connection), make_unique<HandshakeContext>(), 10.0,
@ -581,8 +584,8 @@ class FastPingTestActor : public Actor {
return stop();
}
connection_ = r_connection.move_as_ok();
LOG(INFO) << "RTT: " << connection_->rtt_;
connection_->rtt_ = 0;
LOG(INFO) << "RTT: " << connection_->extra().rtt;
connection_->extra().rtt = 0;
loop();
}