// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "data.h" #include "td/telegram/Client.h" #include "td/telegram/ClientActor.h" #include "td/telegram/files/PartsManager.h" #include "td/telegram/td_api.h" #include "td/actor/actor.h" #include "td/actor/ConcurrentScheduler.h" #include "td/actor/PromiseFuture.h" #include "td/utils/base64.h" #include "td/utils/BufferedFd.h" #include "td/utils/common.h" #include "td/utils/filesystem.h" #include "td/utils/format.h" #include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/port/FileFd.h" #include "td/utils/port/path.h" #include "td/utils/port/sleep.h" #include "td/utils/port/thread.h" #include "td/utils/Promise.h" #include "td/utils/Random.h" #include "td/utils/Slice.h" #include "td/utils/SliceBuilder.h" #include "td/utils/Status.h" #include "td/utils/tests.h" #include #include #include #include #include #include #include #include template static void check_td_error(T &result) { LOG_CHECK(result->get_id() != td::td_api::error::ID) << to_string(result); } class TestClient final : public td::Actor { public: explicit TestClient(td::string name) : name_(std::move(name)) { } struct Update { td::uint64 id; td::tl_object_ptr object; Update(td::uint64 id, td::tl_object_ptr object) : id(id), object(std::move(object)) { } }; class Listener { public: Listener() = default; Listener(const Listener &) = delete; Listener &operator=(const Listener &) = delete; Listener(Listener &&) = delete; Listener &operator=(Listener &&) = delete; virtual ~Listener() = default; virtual void start_listen(TestClient *client) { } virtual void stop_listen() { } virtual void on_update(std::shared_ptr update) = 0; }; void close(td::Promise<> close_promise) { close_promise_ = std::move(close_promise); td_client_.reset(); } td::unique_ptr make_td_callback() { class TdCallbackImpl final : public td::TdCallback { public: explicit TdCallbackImpl(td::ActorId client) : client_(client) { } void on_result(td::uint64 id, td::tl_object_ptr result) final { send_closure(client_, &TestClient::on_result, id, std::move(result)); } void on_error(td::uint64 id, td::tl_object_ptr error) final { send_closure(client_, &TestClient::on_error, id, std::move(error)); } TdCallbackImpl(const TdCallbackImpl &) = delete; TdCallbackImpl &operator=(const TdCallbackImpl &) = delete; TdCallbackImpl(TdCallbackImpl &&) = delete; TdCallbackImpl &operator=(TdCallbackImpl &&) = delete; ~TdCallbackImpl() final { send_closure(client_, &TestClient::on_closed); } private: td::ActorId client_; }; return td::make_unique(actor_id(this)); } void add_listener(td::unique_ptr listener) { auto *ptr = listener.get(); listeners_.push_back(std::move(listener)); ptr->start_listen(this); } void remove_listener(Listener *listener) { pending_remove_.push_back(listener); } void do_pending_remove_listeners() { for (auto listener : pending_remove_) { do_remove_listener(listener); } pending_remove_.clear(); } void do_remove_listener(Listener *listener) { for (size_t i = 0; i < listeners_.size(); i++) { if (listeners_[i].get() == listener) { listener->stop_listen(); listeners_.erase(listeners_.begin() + i); break; } } } void on_result(td::uint64 id, td::tl_object_ptr result) { on_update(std::make_shared(id, std::move(result))); } void on_error(td::uint64 id, td::tl_object_ptr error) { on_update(std::make_shared(id, std::move(error))); } void on_update(std::shared_ptr update) { for (auto &listener : listeners_) { listener->on_update(update); } do_pending_remove_listeners(); } void on_closed() { stop(); } void start_up() final { td::rmrf(name_).ignore(); auto old_context = set_context(std::make_shared()); set_tag(name_); LOG(INFO) << "START UP!"; td_client_ = td::create_actor("Td-proxy", make_td_callback()); } td::ActorOwn td_client_; td::string name_; private: td::vector> listeners_; td::vector pending_remove_; td::Promise<> close_promise_; }; class TestClinetTask : public TestClient::Listener { public: void on_update(std::shared_ptr update) final { auto it = sent_queries_.find(update->id); if (it != sent_queries_.end()) { it->second(std::move(update->object)); sent_queries_.erase(it); } process_update(update); } void start_listen(TestClient *client) final { client_ = client; start_up(); } virtual void process_update(std::shared_ptr update) { } template void send_query(td::tl_object_ptr function, F callback) { auto id = current_query_id_++; sent_queries_[id] = std::forward(callback); send_closure(client_->td_client_, &td::ClientActor::request, id, std::move(function)); } protected: std::map)>> sent_queries_; TestClient *client_ = nullptr; td::uint64 current_query_id_ = 1; virtual void start_up() { } void stop() { client_->remove_listener(this); } }; class DoAuthentication final : public TestClinetTask { public: DoAuthentication(td::string name, td::string phone, td::string code, td::Promise<> promise) : name_(std::move(name)), phone_(std::move(phone)), code_(std::move(code)), promise_(std::move(promise)) { } void start_up() final { send_query(td::make_tl_object(), [this](auto res) { this->process_authorization_state(std::move(res)); }); } void process_authorization_state(td::tl_object_ptr authorization_state) { start_flag_ = true; td::tl_object_ptr function; switch (authorization_state->get_id()) { case td::td_api::authorizationStateWaitPhoneNumber::ID: function = td::make_tl_object(phone_, nullptr); break; case td::td_api::authorizationStateWaitCode::ID: function = td::make_tl_object(code_); break; case td::td_api::authorizationStateWaitRegistration::ID: function = td::make_tl_object(name_, ""); break; case td::td_api::authorizationStateWaitTdlibParameters::ID: { auto parameters = td::td_api::make_object(); parameters->use_test_dc_ = true; parameters->database_directory_ = name_ + TD_DIR_SLASH; parameters->use_message_database_ = true; parameters->use_secret_chats_ = true; parameters->api_id_ = 94575; parameters->api_hash_ = "a3406de8d171bb422bb6ddf3bbd800e2"; parameters->system_language_code_ = "en"; parameters->device_model_ = "Desktop"; parameters->application_version_ = "tdclient-test"; parameters->ignore_file_names_ = false; parameters->enable_storage_optimizer_ = true; function = td::td_api::make_object(std::move(parameters)); break; } case td::td_api::authorizationStateReady::ID: on_authorization_ready(); return; default: LOG(ERROR) << "Unexpected authorization state " << to_string(authorization_state); UNREACHABLE(); } send_query(std::move(function), [](auto res) { LOG_CHECK(res->get_id() == td::td_api::ok::ID) << to_string(res); }); } void on_authorization_ready() { LOG(INFO) << "GOT AUTHORIZED"; stop(); } private: td::string name_; td::string phone_; td::string code_; td::Promise<> promise_; bool start_flag_{false}; void process_update(std::shared_ptr update) final { if (!start_flag_) { return; } if (!update->object) { return; } if (update->object->get_id() == td::td_api::updateAuthorizationState::ID) { auto update_authorization_state = td::move_tl_object_as(update->object); process_authorization_state(std::move(update_authorization_state->authorization_state_)); } } }; class SetUsername final : public TestClinetTask { public: SetUsername(td::string username, td::Promise<> promise) : username_(std::move(username)), promise_(std::move(promise)) { } private: td::string username_; td::Promise<> promise_; td::int64 self_id_ = 0; td::string tag_; void start_up() final { send_query(td::make_tl_object(), [this](auto res) { this->process_me_user(std::move(res)); }); } void process_me_user(td::tl_object_ptr res) { CHECK(res->get_id() == td::td_api::user::ID); auto user = td::move_tl_object_as(res); self_id_ = user->id_; if (user->username_ != username_) { LOG(INFO) << "SET USERNAME: " << username_; send_query(td::make_tl_object(username_), [this](auto res) { CHECK(res->get_id() == td::td_api::ok::ID); this->send_self_message(); }); } else { send_self_message(); } } void send_self_message() { tag_ = PSTRING() << td::format::as_hex(td::Random::secure_int64()); send_query(td::make_tl_object(self_id_, false), [this](auto res) { CHECK(res->get_id() == td::td_api::chat::ID); auto chat = td::move_tl_object_as(res); this->send_query(td::make_tl_object( chat->id_, 0, 0, nullptr, nullptr, td::make_tl_object( td::make_tl_object(PSTRING() << tag_ << " INIT", td::Auto()), false, false)), [](auto res) {}); }); } void process_update(std::shared_ptr update) final { if (!update->object) { return; } if (update->object->get_id() == td::td_api::updateMessageSendSucceeded::ID) { auto updateNewMessage = td::move_tl_object_as(update->object); auto &message = updateNewMessage->message_; if (message->content_->get_id() == td::td_api::messageText::ID) { auto messageText = td::move_tl_object_as(message->content_); auto text = messageText->text_->text_; if (text.substr(0, tag_.size()) == tag_) { LOG(INFO) << "GOT SELF MESSAGE"; return stop(); } } } } }; class CheckTestA final : public TestClinetTask { public: CheckTestA(td::string tag, td::Promise<> promise) : tag_(std::move(tag)), promise_(std::move(promise)) { } private: td::string tag_; td::Promise<> promise_; td::string previous_text_; int cnt_ = 20; void process_update(std::shared_ptr update) final { if (update->object->get_id() == td::td_api::updateNewMessage::ID) { auto updateNewMessage = td::move_tl_object_as(update->object); auto &message = updateNewMessage->message_; if (message->content_->get_id() == td::td_api::messageText::ID) { auto messageText = td::move_tl_object_as(message->content_); auto text = messageText->text_->text_; if (text.substr(0, tag_.size()) == tag_) { LOG_CHECK(text > previous_text_) << td::tag("now", text) << td::tag("previous", previous_text_); previous_text_ = text; cnt_--; LOG(INFO) << "GOT " << td::tag("text", text) << td::tag("left", cnt_); if (cnt_ == 0) { return stop(); } } } } } }; class TestA final : public TestClinetTask { public: TestA(td::string tag, td::string username) : tag_(std::move(tag)), username_(std::move(username)) { } void start_up() final { send_query(td::make_tl_object(username_), [this](auto res) { CHECK(res->get_id() == td::td_api::chat::ID); auto chat = td::move_tl_object_as(res); for (int i = 0; i < 20; i++) { this->send_query( td::make_tl_object( chat->id_, 0, 0, nullptr, nullptr, td::make_tl_object( td::make_tl_object(PSTRING() << tag_ << " " << (1000 + i), td::Auto()), false, false)), [&](auto res) { this->stop(); }); } }); } private: td::string tag_; td::string username_; }; class TestSecretChat final : public TestClinetTask { public: TestSecretChat(td::string tag, td::string username) : tag_(std::move(tag)), username_(std::move(username)) { } void start_up() final { auto f = [this](auto res) { CHECK(res->get_id() == td::td_api::chat::ID); auto chat = td::move_tl_object_as(res); this->chat_id_ = chat->id_; this->secret_chat_id_ = td::move_tl_object_as(chat->type_)->secret_chat_id_; }; send_query(td::make_tl_object(username_), [this, f = std::move(f)](auto res) mutable { CHECK(res->get_id() == td::td_api::chat::ID); auto chat = td::move_tl_object_as(res); CHECK(chat->type_->get_id() == td::td_api::chatTypePrivate::ID); auto info = td::move_tl_object_as(chat->type_); this->send_query(td::make_tl_object(info->user_id_), std::move(f)); }); } void process_update(std::shared_ptr update) final { if (!update->object) { return; } if (update->object->get_id() == td::td_api::updateSecretChat::ID) { auto update_secret_chat = td::move_tl_object_as(update->object); if (update_secret_chat->secret_chat_->id_ != secret_chat_id_ || update_secret_chat->secret_chat_->state_->get_id() != td::td_api::secretChatStateReady::ID) { return; } LOG(INFO) << "SEND ENCRYPTED MESSAGES"; for (int i = 0; i < 20; i++) { send_query( td::make_tl_object( chat_id_, 0, 0, nullptr, nullptr, td::make_tl_object( td::make_tl_object(PSTRING() << tag_ << " " << (1000 + i), td::Auto()), false, false)), [](auto res) {}); } } } private: td::string tag_; td::string username_; td::int64 secret_chat_id_ = 0; td::int64 chat_id_ = 0; }; class TestFileGenerated final : public TestClinetTask { public: TestFileGenerated(td::string tag, td::string username) : tag_(std::move(tag)), username_(std::move(username)) { } void start_up() final { } void process_update(std::shared_ptr update) final { if (!update->object) { return; } if (update->object->get_id() == td::td_api::updateNewMessage::ID) { auto updateNewMessage = td::move_tl_object_as(update->object); auto &message = updateNewMessage->message_; chat_id_ = message->chat_id_; if (message->content_->get_id() == td::td_api::messageText::ID) { auto messageText = td::move_tl_object_as(message->content_); auto text = messageText->text_->text_; if (text.substr(0, tag_.size()) == tag_) { if (text.substr(tag_.size() + 1) == "ONE_FILE") { return one_file(); } } } } else if (update->object->get_id() == td::td_api::updateFileGenerationStart::ID) { auto info = td::move_tl_object_as(update->object); generate_file(info->generation_id_, info->original_path_, info->destination_path_, info->conversion_); } else if (update->object->get_id() == td::td_api::updateFile::ID) { auto file = td::move_tl_object_as(update->object); LOG(INFO) << to_string(file); } } void one_file() { LOG(ERROR) << "Start ONE_FILE test"; auto file_path = PSTRING() << "test_documents" << TD_DIR_SLASH << "a.txt"; td::mkpath(file_path).ensure(); auto raw_file = td::FileFd::open(file_path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write) .move_as_ok(); auto file = td::BufferedFd(std::move(raw_file)); for (int i = 1; i < 100000; i++) { file.write(PSLICE() << i << "\n").ensure(); } file.flush_write().ensure(); // important file.close(); send_query(td::make_tl_object( chat_id_, 0, 0, nullptr, nullptr, td::make_tl_object( td::make_tl_object(file_path, "square", 0), td::make_tl_object( td::make_tl_object(file_path, "thumbnail", 0), 0, 0), true, td::make_tl_object(tag_, td::Auto()))), [](auto res) { check_td_error(res); }); send_query(td::make_tl_object( chat_id_, 0, 0, nullptr, nullptr, td::make_tl_object( td::make_tl_object(file_path, "square", 0), nullptr, true, td::make_tl_object(tag_, td::Auto()))), [](auto res) { check_td_error(res); }); } class GenerateFile final : public td::Actor { public: GenerateFile(TestClinetTask *parent, td::int64 id, td::string original_path, td::string destination_path, td::string conversion) : parent_(parent) , id_(id) , original_path_(std::move(original_path)) , destination_path_(std::move(destination_path)) , conversion_(std::move(conversion)) { } private: TestClinetTask *parent_; td::int64 id_; td::string original_path_; td::string destination_path_; td::string conversion_; FILE *from = nullptr; FILE *to = nullptr; void start_up() final { from = std::fopen(original_path_.c_str(), "rb"); CHECK(from); to = std::fopen(destination_path_.c_str(), "wb"); CHECK(to); yield(); } void loop() final { int cnt = 0; while (true) { td::uint32 x; auto r = std::fscanf(from, "%u", &x); if (r != 1) { return stop(); } std::fprintf(to, "%u\n", x * x); if (++cnt >= 10000) { break; } } auto ready = std::ftell(to); LOG(ERROR) << "READY: " << ready; parent_->send_query(td::make_tl_object( id_, 1039823 /*yeah, exact size of this file*/, td::narrow_cast(ready)), [](auto result) { check_td_error(result); }); set_timeout_in(0.02); } void tear_down() final { std::fclose(from); std::fclose(to); parent_->send_query(td::make_tl_object(id_, nullptr), [](auto result) { check_td_error(result); }); } }; void generate_file(td::int64 id, const td::string &original_path, const td::string &destination_path, const td::string &conversion) { LOG(ERROR) << "Generate file " << td::tag("id", id) << td::tag("original_path", original_path) << td::tag("destination_path", destination_path) << td::tag("conversion", conversion); if (conversion == "square") { td::create_actor("GenerateFile", this, id, original_path, destination_path, conversion).release(); } else if (conversion == "thumbnail") { td::write_file(destination_path, td::base64url_decode(td::Slice(thumbnail, thumbnail_size)).ok()).ensure(); send_query(td::make_tl_object(id, nullptr), [](auto result) { check_td_error(result); }); } else { LOG(FATAL) << "Unknown " << td::tag("conversion", conversion); } } private: td::string tag_; td::string username_; td::int64 chat_id_ = 0; }; class CheckTestC final : public TestClinetTask { public: CheckTestC(td::string username, td::string tag, td::Promise<> promise) : username_(std::move(username)), tag_(std::move(tag)), promise_(std::move(promise)) { } void start_up() final { send_query(td::make_tl_object(username_), [this](auto res) { CHECK(res->get_id() == td::td_api::chat::ID); auto chat = td::move_tl_object_as(res); chat_id_ = chat->id_; this->one_file(); }); } private: td::string username_; td::string tag_; td::Promise<> promise_; td::int64 chat_id_ = 0; void one_file() { send_query(td::make_tl_object( chat_id_, 0, 0, nullptr, nullptr, td::make_tl_object( td::make_tl_object(PSTRING() << tag_ << " ONE_FILE", td::Auto()), false, false)), [](auto res) { check_td_error(res); }); } void process_update(std::shared_ptr update) final { if (!update->object) { return; } if (update->object->get_id() == td::td_api::updateNewMessage::ID) { auto updateNewMessage = td::move_tl_object_as(update->object); auto &message = updateNewMessage->message_; if (message->content_->get_id() == td::td_api::messageDocument::ID) { auto messageDocument = td::move_tl_object_as(message->content_); auto text = messageDocument->caption_->text_; if (text.substr(0, tag_.size()) == tag_) { file_id_to_check_ = messageDocument->document_->document_->id_; LOG(ERROR) << "GOT FILE " << to_string(messageDocument->document_->document_); send_query(td::make_tl_object(file_id_to_check_, 1, 0, 0, false), [](auto res) { check_td_error(res); }); } } } else if (update->object->get_id() == td::td_api::updateFile::ID) { auto updateFile = td::move_tl_object_as(update->object); if (updateFile->file_->id_ == file_id_to_check_ && (updateFile->file_->local_->is_downloading_completed_)) { check_file(updateFile->file_->local_->path_); } } } void check_file(td::CSlice path) { FILE *from = std::fopen(path.c_str(), "rb"); CHECK(from); td::uint32 x; td::uint32 y = 1; while (std::fscanf(from, "%u", &x) == 1) { CHECK(x == y * y); y++; } std::fclose(from); stop(); } td::int32 file_id_to_check_ = 0; }; class LoginTestActor final : public td::Actor { public: explicit LoginTestActor(td::Status *status) : status_(status) { *status_ = td::Status::OK(); } private: td::Status *status_; td::ActorOwn alice_; td::ActorOwn bob_; td::string alice_phone_ = "9996636437"; td::string bob_phone_ = "9996636438"; td::string alice_username_ = "alice_" + alice_phone_; td::string bob_username_ = "bob_" + bob_phone_; td::string stage_name_; void begin_stage(td::string stage_name, double timeout) { LOG(WARNING) << "Begin stage '" << stage_name << "'"; stage_name_ = std::move(stage_name); set_timeout_in(timeout); } void start_up() final { begin_stage("Logging in", 160); alice_ = td::create_actor("AliceClient", "alice"); bob_ = td::create_actor("BobClient", "bob"); td::send_closure(alice_, &TestClient::add_listener, td::make_unique( "alice", alice_phone_, "33333", td::create_event_promise(self_closure(this, &LoginTestActor::start_up_fence_dec)))); td::send_closure(bob_, &TestClient::add_listener, td::make_unique( "bob", bob_phone_, "33333", td::create_event_promise(self_closure(this, &LoginTestActor::start_up_fence_dec)))); } int start_up_fence_ = 3; void start_up_fence_dec() { --start_up_fence_; if (start_up_fence_ == 0) { init(); } else if (start_up_fence_ == 1) { return init(); class WaitActor final : public td::Actor { public: WaitActor(double timeout, td::Promise<> promise) : timeout_(timeout), promise_(std::move(promise)) { } void start_up() final { set_timeout_in(timeout_); } void timeout_expired() final { stop(); } private: double timeout_; td::Promise<> promise_; }; td::create_actor("WaitActor", 2, td::create_event_promise(self_closure(this, &LoginTestActor::start_up_fence_dec))) .release(); } } void init() { td::send_closure(alice_, &TestClient::add_listener, td::make_unique(alice_username_, td::create_event_promise(self_closure( this, &LoginTestActor::init_fence_dec)))); td::send_closure(bob_, &TestClient::add_listener, td::make_unique( bob_username_, td::create_event_promise(self_closure(this, &LoginTestActor::init_fence_dec)))); } int init_fence_ = 2; void init_fence_dec() { if (--init_fence_ == 0) { test_a(); } } int test_a_fence_ = 2; void test_a_fence() { if (--test_a_fence_ == 0) { test_b(); } } void test_a() { begin_stage("Ready to create chats", 80); td::string alice_tag = PSTRING() << td::format::as_hex(td::Random::secure_int64()); td::string bob_tag = PSTRING() << td::format::as_hex(td::Random::secure_int64()); td::send_closure(bob_, &TestClient::add_listener, td::make_unique( alice_tag, td::create_event_promise(self_closure(this, &LoginTestActor::test_a_fence)))); td::send_closure(alice_, &TestClient::add_listener, td::make_unique( bob_tag, td::create_event_promise(self_closure(this, &LoginTestActor::test_a_fence)))); td::send_closure(alice_, &TestClient::add_listener, td::make_unique(alice_tag, bob_username_)); td::send_closure(bob_, &TestClient::add_listener, td::make_unique(bob_tag, alice_username_)); // td::send_closure(alice_, &TestClient::add_listener, td::make_unique(bob_username_)); } void timeout_expired() final { LOG(FATAL) << "Timeout expired in stage '" << stage_name_ << "'"; } int test_b_fence_ = 1; void test_b_fence() { if (--test_b_fence_ == 0) { test_c(); } } int test_c_fence_ = 1; void test_c_fence() { if (--test_c_fence_ == 0) { finish(); } } void test_b() { begin_stage("Create secret chat", 40); td::string tag = PSTRING() << td::format::as_hex(td::Random::secure_int64()); td::send_closure( bob_, &TestClient::add_listener, td::make_unique(tag, td::create_event_promise(self_closure(this, &LoginTestActor::test_b_fence)))); td::send_closure(alice_, &TestClient::add_listener, td::make_unique(tag, bob_username_)); } void test_c() { begin_stage("Send generated file", 240); td::string tag = PSTRING() << td::format::as_hex(td::Random::secure_int64()); td::send_closure( bob_, &TestClient::add_listener, td::make_unique(alice_username_, tag, td::create_event_promise(self_closure(this, &LoginTestActor::test_c_fence)))); td::send_closure(alice_, &TestClient::add_listener, td::make_unique(tag, bob_username_)); } int finish_fence_ = 2; void finish_fence() { finish_fence_--; if (finish_fence_ == 0) { td::Scheduler::instance()->finish(); stop(); } } void finish() { td::send_closure(alice_, &TestClient::close, td::create_event_promise(self_closure(this, &LoginTestActor::finish_fence))); td::send_closure(bob_, &TestClient::close, td::create_event_promise(self_closure(this, &LoginTestActor::finish_fence))); } }; class Tdclient_login final : public td::Test { public: using Test::Test; bool step() final { if (!is_inited_) { sched_.init(4); sched_.create_actor_unsafe(0, "LoginTestActor", &result_).release(); sched_.start(); is_inited_ = true; } bool ret = sched_.run_main(10); if (ret) { return true; } sched_.finish(); if (result_.is_error()) { LOG(ERROR) << result_; } ASSERT_TRUE(result_.is_ok()); return false; } private: bool is_inited_ = false; td::ConcurrentScheduler sched_; td::Status result_; }; //RegisterTest Tdclient_login("Tdclient_login"); TEST(Client, Simple) { td::Client client; // client.execute({1, td::td_api::make_object("actor", 1)}); client.send({3, td::make_tl_object(3)}); while (true) { auto result = client.receive(10); if (result.id == 3) { auto test_int = td::td_api::move_object_as(result.object); ASSERT_EQ(test_int->value_, 9); break; } } } TEST(Client, SimpleMulti) { std::vector clients(7); //for (auto &client : clients) { //client.execute({1, td::td_api::make_object("td_requests", 1)}); //} for (size_t i = 0; i < clients.size(); i++) { clients[i].send({i + 2, td::make_tl_object(3)}); if (td::Random::fast_bool()) { clients[i].send({1, td::make_tl_object()}); } } for (size_t i = 0; i < clients.size(); i++) { while (true) { auto result = clients[i].receive(10); if (result.id == i + 2) { CHECK(result.object->get_id() == td::td_api::testInt::ID); auto test_int = td::td_api::move_object_as(result.object); ASSERT_EQ(test_int->value_, 9); break; } } } } #if !TD_THREAD_UNSUPPORTED TEST(Client, Multi) { td::vector threads; std::atomic ok_count{0}; for (int i = 0; i < 4; i++) { threads.emplace_back([i, &ok_count] { for (int j = 0; j < 1000; j++) { td::Client client; auto request_id = static_cast(j + 2 + 1000 * i); client.send({request_id, td::make_tl_object(3)}); if (j & 1) { client.send({1, td::make_tl_object()}); } while (true) { auto result = client.receive(10); if (result.id == request_id) { ok_count++; if ((j & 1) == 0) { client.send({1, td::make_tl_object()}); } } if (result.id == 0 && result.object != nullptr && result.object->get_id() == td::td_api::updateAuthorizationState::ID && static_cast(result.object.get()) ->authorization_state_->get_id() == td::td_api::authorizationStateClosed::ID) { ok_count++; break; } } } }); } for (auto &thread : threads) { thread.join(); } ASSERT_EQ(8 * 1000, ok_count.load()); } TEST(Client, Manager) { td::vector threads; td::ClientManager client; #if !TD_EVENTFD_UNSUPPORTED // Client must be used from a single thread if there is no EventFd int threads_n = 4; #else int threads_n = 1; #endif int clients_n = 1000; client.send(0, 3, td::make_tl_object(3)); client.send(-1, 3, td::make_tl_object(3)); for (int i = 0; i < threads_n; i++) { threads.emplace_back([&] { for (int i = 0; i <= clients_n; i++) { auto id = client.create_client_id(); if (i != 0) { client.send(id, 3, td::make_tl_object(3)); } } }); } for (auto &thread : threads) { thread.join(); } std::set ids; while (ids.size() != static_cast(threads_n) * clients_n) { auto event = client.receive(10); if (event.client_id == 0 || event.client_id == -1) { ASSERT_EQ(td::td_api::error::ID, event.object->get_id()); ASSERT_EQ(400, static_cast(*event.object).code_); continue; } if (event.request_id == 3) { ASSERT_EQ(td::td_api::testInt::ID, event.object->get_id()); ASSERT_TRUE(ids.insert(event.client_id).second); } } } #if !TD_EVENTFD_UNSUPPORTED // Client must be used from a single thread if there is no EventFd TEST(Client, Close) { std::atomic stop_send{false}; std::atomic can_stop_receive{false}; std::atomic send_count{1}; std::atomic receive_count{0}; td::Client client; std::mutex request_ids_mutex; std::set request_ids; request_ids.insert(1); td::thread send_thread([&] { td::uint64 request_id = 2; while (!stop_send.load()) { { std::unique_lock guard(request_ids_mutex); request_ids.insert(request_id); } client.send({request_id++, td::make_tl_object(3)}); send_count++; } can_stop_receive = true; }); td::thread receive_thread([&] { auto max_continue_send = td::Random::fast_bool() ? 0 : 1000; while (true) { auto response = client.receive(10.0); if (response.object == nullptr) { if (!stop_send) { stop_send = true; } else { return; } } if (response.id > 0) { if (!stop_send && response.object->get_id() == td::td_api::error::ID && static_cast(*response.object).code_ == 500 && td::Random::fast(0, max_continue_send) == 0) { stop_send = true; } receive_count++; { std::unique_lock guard(request_ids_mutex); size_t erased_count = request_ids.erase(response.id); CHECK(erased_count > 0); } } if (can_stop_receive && receive_count == send_count) { break; } } }); td::usleep_for((td::Random::fast_bool() ? 0 : 1000) * (td::Random::fast_bool() ? 1 : 50)); client.send({1, td::make_tl_object()}); send_thread.join(); receive_thread.join(); ASSERT_EQ(send_count.load(), receive_count.load()); ASSERT_TRUE(request_ids.empty()); } TEST(Client, ManagerClose) { std::atomic stop_send{false}; std::atomic can_stop_receive{false}; std::atomic send_count{1}; std::atomic receive_count{0}; td::ClientManager client_manager; auto client_id = client_manager.create_client_id(); std::mutex request_ids_mutex; std::set request_ids; request_ids.insert(1); td::thread send_thread([&] { td::uint64 request_id = 2; while (!stop_send.load()) { { std::unique_lock guard(request_ids_mutex); request_ids.insert(request_id); } client_manager.send(client_id, request_id++, td::make_tl_object(3)); send_count++; } can_stop_receive = true; }); td::thread receive_thread([&] { auto max_continue_send = td::Random::fast_bool() ? 0 : 1000; bool can_stop_send = false; while (true) { auto response = client_manager.receive(10.0); if (response.object == nullptr) { if (!stop_send) { can_stop_send = true; } else { return; } } if (can_stop_send && max_continue_send-- <= 0) { stop_send = true; } if (response.request_id > 0) { receive_count++; { std::unique_lock guard(request_ids_mutex); size_t erased_count = request_ids.erase(response.request_id); CHECK(erased_count > 0); } } if (can_stop_receive && receive_count == send_count) { break; } } }); td::usleep_for((td::Random::fast_bool() ? 0 : 1000) * (td::Random::fast_bool() ? 1 : 50)); client_manager.send(client_id, 1, td::make_tl_object()); send_thread.join(); receive_thread.join(); ASSERT_EQ(send_count.load(), receive_count.load()); ASSERT_TRUE(request_ids.empty()); } #endif #endif TEST(Client, ManagerCloseOneThread) { td::ClientManager client_manager; td::uint64 request_id = 2; std::map sent_requests; td::uint64 sent_count = 0; td::uint64 receive_count = 0; auto send_request = [&](td::int32 client_id, td::int32 expected_error_code) { sent_count++; sent_requests.emplace(request_id, expected_error_code); client_manager.send(client_id, request_id++, td::make_tl_object(3)); }; auto receive = [&] { while (receive_count != sent_count) { auto response = client_manager.receive(1.0); if (response.object == nullptr) { continue; } if (response.request_id > 0) { receive_count++; auto it = sent_requests.find(response.request_id); CHECK(it != sent_requests.end()); auto expected_error_code = it->second; sent_requests.erase(it); if (expected_error_code == 0) { if (response.request_id == 1) { ASSERT_EQ(td::td_api::ok::ID, response.object->get_id()); } else { ASSERT_EQ(td::td_api::testInt::ID, response.object->get_id()); } } else { ASSERT_EQ(td::td_api::error::ID, response.object->get_id()); ASSERT_EQ(expected_error_code, static_cast(*response.object).code_); } } } }; for (int t = 0; t < 3; t++) { for (td::int32 i = -5; i <= 0; i++) { send_request(i, 400); } receive(); auto client_id = client_manager.create_client_id(); for (td::int32 i = -5; i < 5; i++) { send_request(i, i == client_id ? 0 : (i > 0 && i < client_id ? 500 : 400)); } receive(); for (int i = 0; i < 10; i++) { send_request(client_id, 0); } receive(); sent_count++; sent_requests.emplace(1, 0); client_manager.send(client_id, 1, td::make_tl_object()); for (int i = 0; i < 10; i++) { send_request(client_id, 500); } receive(); for (int i = 0; i < 10; i++) { send_request(client_id, 500); } receive(); } ASSERT_TRUE(sent_requests.empty()); } TEST(PartsManager, hands) { { td::PartsManager pm; pm.init(0, 100000, false, 10, {0, 1, 2}, false, true).ensure_error(); //pm.set_known_prefix(0, false).ensure(); } { td::PartsManager pm; pm.init(1, 100000, true, 10, {0, 1, 2}, false, true).ensure_error(); } }