tdlight/test/tdclient.cpp

1193 lines
40 KiB
C++

//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "data.h"
#include "td/telegram/Client.h"
#include "td/telegram/ClientActor.h"
#include "td/telegram/files/PartsManager.h"
#include "td/telegram/td_api.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/base64.h"
#include "td/utils/BufferedFd.h"
#include "td/utils/common.h"
#include "td/utils/filesystem.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/FileFd.h"
#include "td/utils/port/path.h"
#include "td/utils/port/sleep.h"
#include "td/utils/port/thread.h"
#include "td/utils/Promise.h"
#include "td/utils/Random.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/tests.h"
#include <atomic>
#include <cstdio>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <utility>
template <class T>
static void check_td_error(T &result) {
LOG_CHECK(result->get_id() != td::td_api::error::ID) << to_string(result);
}
class TestClient final : public td::Actor {
public:
explicit TestClient(td::string name) : name_(std::move(name)) {
}
struct Update {
td::uint64 id;
td::tl_object_ptr<td::td_api::Object> object;
Update(td::uint64 id, td::tl_object_ptr<td::td_api::Object> object) : id(id), object(std::move(object)) {
}
};
class Listener {
public:
Listener() = default;
Listener(const Listener &) = delete;
Listener &operator=(const Listener &) = delete;
Listener(Listener &&) = delete;
Listener &operator=(Listener &&) = delete;
virtual ~Listener() = default;
virtual void start_listen(TestClient *client) {
}
virtual void stop_listen() {
}
virtual void on_update(std::shared_ptr<Update> update) = 0;
};
void close(td::Promise<> close_promise) {
close_promise_ = std::move(close_promise);
td_client_.reset();
}
td::unique_ptr<td::TdCallback> make_td_callback() {
class TdCallbackImpl final : public td::TdCallback {
public:
explicit TdCallbackImpl(td::ActorId<TestClient> client) : client_(client) {
}
void on_result(td::uint64 id, td::tl_object_ptr<td::td_api::Object> result) final {
send_closure(client_, &TestClient::on_result, id, std::move(result));
}
void on_error(td::uint64 id, td::tl_object_ptr<td::td_api::error> error) final {
send_closure(client_, &TestClient::on_error, id, std::move(error));
}
TdCallbackImpl(const TdCallbackImpl &) = delete;
TdCallbackImpl &operator=(const TdCallbackImpl &) = delete;
TdCallbackImpl(TdCallbackImpl &&) = delete;
TdCallbackImpl &operator=(TdCallbackImpl &&) = delete;
~TdCallbackImpl() final {
send_closure(client_, &TestClient::on_closed);
}
private:
td::ActorId<TestClient> client_;
};
return td::make_unique<TdCallbackImpl>(actor_id(this));
}
void add_listener(td::unique_ptr<Listener> listener) {
auto *ptr = listener.get();
listeners_.push_back(std::move(listener));
ptr->start_listen(this);
}
void remove_listener(Listener *listener) {
pending_remove_.push_back(listener);
}
void do_pending_remove_listeners() {
for (auto listener : pending_remove_) {
do_remove_listener(listener);
}
pending_remove_.clear();
}
void do_remove_listener(Listener *listener) {
for (size_t i = 0; i < listeners_.size(); i++) {
if (listeners_[i].get() == listener) {
listener->stop_listen();
listeners_.erase(listeners_.begin() + i);
break;
}
}
}
void on_result(td::uint64 id, td::tl_object_ptr<td::td_api::Object> result) {
on_update(std::make_shared<Update>(id, std::move(result)));
}
void on_error(td::uint64 id, td::tl_object_ptr<td::td_api::error> error) {
on_update(std::make_shared<Update>(id, std::move(error)));
}
void on_update(std::shared_ptr<Update> update) {
for (auto &listener : listeners_) {
listener->on_update(update);
}
do_pending_remove_listeners();
}
void on_closed() {
stop();
}
void start_up() final {
td::rmrf(name_).ignore();
auto old_context = set_context(std::make_shared<td::ActorContext>());
set_tag(name_);
LOG(INFO) << "Start up!";
td_client_ = td::create_actor<td::ClientActor>("Td-proxy", make_td_callback());
}
td::ActorOwn<td::ClientActor> td_client_;
td::string name_;
private:
td::vector<td::unique_ptr<Listener>> listeners_;
td::vector<Listener *> pending_remove_;
td::Promise<> close_promise_;
};
class TestClinetTask : public TestClient::Listener {
public:
void on_update(std::shared_ptr<TestClient::Update> update) final {
auto it = sent_queries_.find(update->id);
if (it != sent_queries_.end()) {
it->second(std::move(update->object));
sent_queries_.erase(it);
}
process_update(update);
}
void start_listen(TestClient *client) final {
client_ = client;
start_up();
}
virtual void process_update(std::shared_ptr<TestClient::Update> update) {
}
template <class F>
void send_query(td::tl_object_ptr<td::td_api::Function> function, F callback) {
auto id = current_query_id_++;
sent_queries_[id] = std::forward<F>(callback);
send_closure(client_->td_client_, &td::ClientActor::request, id, std::move(function));
}
protected:
std::map<td::uint64, std::function<void(td::tl_object_ptr<td::td_api::Object>)>> sent_queries_;
TestClient *client_ = nullptr;
td::uint64 current_query_id_ = 1;
virtual void start_up() {
}
void stop() {
client_->remove_listener(this);
}
};
class DoAuthentication final : public TestClinetTask {
public:
DoAuthentication(td::string name, td::string phone, td::string code, td::Promise<> promise)
: name_(std::move(name)), phone_(std::move(phone)), code_(std::move(code)), promise_(std::move(promise)) {
}
void start_up() final {
send_query(td::make_tl_object<td::td_api::getOption>("version"),
[](auto res) { LOG(INFO) << td::td_api::to_string(res); });
}
void process_authorization_state(td::tl_object_ptr<td::td_api::Object> authorization_state) {
td::tl_object_ptr<td::td_api::Function> function;
switch (authorization_state->get_id()) {
case td::td_api::authorizationStateWaitPhoneNumber::ID:
function = td::make_tl_object<td::td_api::setAuthenticationPhoneNumber>(phone_, nullptr);
break;
case td::td_api::authorizationStateWaitEmailAddress::ID:
function = td::make_tl_object<td::td_api::setAuthenticationEmailAddress>("alice_test@gmail.com");
break;
case td::td_api::authorizationStateWaitEmailCode::ID:
function = td::make_tl_object<td::td_api::checkAuthenticationEmailCode>(
td::make_tl_object<td::td_api::emailAddressAuthenticationCode>(code_));
break;
case td::td_api::authorizationStateWaitCode::ID:
function = td::make_tl_object<td::td_api::checkAuthenticationCode>(code_);
break;
case td::td_api::authorizationStateWaitRegistration::ID:
function = td::make_tl_object<td::td_api::registerUser>(name_, "", false);
break;
case td::td_api::authorizationStateWaitTdlibParameters::ID: {
auto request = td::td_api::make_object<td::td_api::setTdlibParameters>();
request->use_test_dc_ = true;
request->database_directory_ = name_ + TD_DIR_SLASH;
request->use_message_database_ = true;
request->use_secret_chats_ = true;
request->api_id_ = 94575;
request->api_hash_ = "a3406de8d171bb422bb6ddf3bbd800e2";
request->system_language_code_ = "en";
request->device_model_ = "Desktop";
request->application_version_ = "tdclient-test";
function = std::move(request);
break;
}
case td::td_api::authorizationStateReady::ID:
on_authorization_ready();
return;
default:
LOG(ERROR) << "Unexpected authorization state " << to_string(authorization_state);
UNREACHABLE();
}
send_query(std::move(function), [](auto res) { LOG_CHECK(res->get_id() == td::td_api::ok::ID) << to_string(res); });
}
void on_authorization_ready() {
LOG(INFO) << "Authorization is completed";
stop();
}
private:
td::string name_;
td::string phone_;
td::string code_;
td::Promise<> promise_;
void process_update(std::shared_ptr<TestClient::Update> update) final {
if (!update->object) {
return;
}
if (update->object->get_id() == td::td_api::updateAuthorizationState::ID) {
auto update_authorization_state = td::move_tl_object_as<td::td_api::updateAuthorizationState>(update->object);
process_authorization_state(std::move(update_authorization_state->authorization_state_));
}
}
};
class SetUsername final : public TestClinetTask {
public:
SetUsername(td::string username, td::Promise<> promise)
: username_(std::move(username)), promise_(std::move(promise)) {
}
private:
td::string username_;
td::Promise<> promise_;
td::int64 self_id_ = 0;
td::string tag_;
void start_up() final {
send_query(td::make_tl_object<td::td_api::getMe>(), [this](auto res) { this->process_me_user(std::move(res)); });
}
void process_me_user(td::tl_object_ptr<td::td_api::Object> res) {
CHECK(res->get_id() == td::td_api::user::ID);
auto user = td::move_tl_object_as<td::td_api::user>(res);
self_id_ = user->id_;
auto current_username = user->usernames_ != nullptr ? user->usernames_->editable_username_ : td::string();
if (current_username != username_) {
LOG(INFO) << "Set username: " << username_;
send_query(td::make_tl_object<td::td_api::setUsername>(username_), [this](auto res) {
CHECK(res->get_id() == td::td_api::ok::ID);
this->send_self_message();
});
} else {
send_self_message();
}
}
void send_self_message() {
tag_ = PSTRING() << td::format::as_hex(td::Random::secure_int64());
send_query(td::make_tl_object<td::td_api::createPrivateChat>(self_id_, false), [this](auto res) {
CHECK(res->get_id() == td::td_api::chat::ID);
auto chat = td::move_tl_object_as<td::td_api::chat>(res);
this->send_query(td::make_tl_object<td::td_api::sendMessage>(
chat->id_, 0, nullptr, nullptr, nullptr,
td::make_tl_object<td::td_api::inputMessageText>(
td::make_tl_object<td::td_api::formattedText>(PSTRING() << tag_ << " INIT", td::Auto()),
nullptr, false)),
[](auto res) {});
});
}
void process_update(std::shared_ptr<TestClient::Update> update) final {
if (!update->object) {
return;
}
if (update->object->get_id() == td::td_api::updateMessageSendSucceeded::ID) {
auto updateNewMessage = td::move_tl_object_as<td::td_api::updateMessageSendSucceeded>(update->object);
auto &message = updateNewMessage->message_;
if (message->content_->get_id() == td::td_api::messageText::ID) {
auto messageText = td::move_tl_object_as<td::td_api::messageText>(message->content_);
auto text = messageText->text_->text_;
if (text.substr(0, tag_.size()) == tag_) {
LOG(INFO) << "Receive self-message";
return stop();
}
}
}
}
};
class CheckTestA final : public TestClinetTask {
public:
CheckTestA(td::string tag, td::Promise<> promise) : tag_(std::move(tag)), promise_(std::move(promise)) {
}
private:
td::string tag_;
td::Promise<> promise_;
td::string previous_text_;
int cnt_ = 20;
void process_update(std::shared_ptr<TestClient::Update> update) final {
if (update->object->get_id() == td::td_api::updateNewMessage::ID) {
auto updateNewMessage = td::move_tl_object_as<td::td_api::updateNewMessage>(update->object);
auto &message = updateNewMessage->message_;
if (message->content_->get_id() == td::td_api::messageText::ID) {
auto messageText = td::move_tl_object_as<td::td_api::messageText>(message->content_);
auto text = messageText->text_->text_;
if (text.substr(0, tag_.size()) == tag_) {
LOG_CHECK(text > previous_text_) << td::tag("now", text) << td::tag("previous", previous_text_);
previous_text_ = text;
cnt_--;
LOG(INFO) << "Receive " << td::tag("text", text) << td::tag("left", cnt_);
if (cnt_ == 0) {
return stop();
}
}
}
}
}
};
class TestA final : public TestClinetTask {
public:
TestA(td::string tag, td::string username) : tag_(std::move(tag)), username_(std::move(username)) {
}
void start_up() final {
send_query(td::make_tl_object<td::td_api::searchPublicChat>(username_), [this](auto res) {
CHECK(res->get_id() == td::td_api::chat::ID);
auto chat = td::move_tl_object_as<td::td_api::chat>(res);
for (int i = 0; i < 20; i++) {
this->send_query(
td::make_tl_object<td::td_api::sendMessage>(
chat->id_, 0, nullptr, nullptr, nullptr,
td::make_tl_object<td::td_api::inputMessageText>(
td::make_tl_object<td::td_api::formattedText>(PSTRING() << tag_ << " " << (1000 + i), td::Auto()),
nullptr, false)),
[&](auto res) { this->stop(); });
}
});
}
private:
td::string tag_;
td::string username_;
};
class TestSecretChat final : public TestClinetTask {
public:
TestSecretChat(td::string tag, td::string username) : tag_(std::move(tag)), username_(std::move(username)) {
}
void start_up() final {
auto f = [this](auto res) {
CHECK(res->get_id() == td::td_api::chat::ID);
auto chat = td::move_tl_object_as<td::td_api::chat>(res);
this->chat_id_ = chat->id_;
this->secret_chat_id_ = td::move_tl_object_as<td::td_api::chatTypeSecret>(chat->type_)->secret_chat_id_;
};
send_query(td::make_tl_object<td::td_api::searchPublicChat>(username_), [this, f = std::move(f)](auto res) mutable {
CHECK(res->get_id() == td::td_api::chat::ID);
auto chat = td::move_tl_object_as<td::td_api::chat>(res);
CHECK(chat->type_->get_id() == td::td_api::chatTypePrivate::ID);
auto info = td::move_tl_object_as<td::td_api::chatTypePrivate>(chat->type_);
this->send_query(td::make_tl_object<td::td_api::createNewSecretChat>(info->user_id_), std::move(f));
});
}
void process_update(std::shared_ptr<TestClient::Update> update) final {
if (!update->object) {
return;
}
if (update->object->get_id() == td::td_api::updateSecretChat::ID) {
auto update_secret_chat = td::move_tl_object_as<td::td_api::updateSecretChat>(update->object);
if (update_secret_chat->secret_chat_->id_ != secret_chat_id_ ||
update_secret_chat->secret_chat_->state_->get_id() != td::td_api::secretChatStateReady::ID) {
return;
}
LOG(INFO) << "Send encrypted messages";
for (int i = 0; i < 20; i++) {
send_query(
td::make_tl_object<td::td_api::sendMessage>(
chat_id_, 0, nullptr, nullptr, nullptr,
td::make_tl_object<td::td_api::inputMessageText>(
td::make_tl_object<td::td_api::formattedText>(PSTRING() << tag_ << " " << (1000 + i), td::Auto()),
nullptr, false)),
[](auto res) {});
}
}
}
private:
td::string tag_;
td::string username_;
td::int64 secret_chat_id_ = 0;
td::int64 chat_id_ = 0;
};
class TestFileGenerated final : public TestClinetTask {
public:
TestFileGenerated(td::string tag, td::string username) : tag_(std::move(tag)), username_(std::move(username)) {
}
void start_up() final {
}
void process_update(std::shared_ptr<TestClient::Update> update) final {
if (!update->object) {
return;
}
if (update->object->get_id() == td::td_api::updateNewMessage::ID) {
auto updateNewMessage = td::move_tl_object_as<td::td_api::updateNewMessage>(update->object);
auto &message = updateNewMessage->message_;
chat_id_ = message->chat_id_;
if (message->content_->get_id() == td::td_api::messageText::ID) {
auto messageText = td::move_tl_object_as<td::td_api::messageText>(message->content_);
auto text = messageText->text_->text_;
if (text.substr(0, tag_.size()) == tag_) {
if (text.substr(tag_.size() + 1) == "ONE_FILE") {
return one_file();
}
}
}
} else if (update->object->get_id() == td::td_api::updateFileGenerationStart::ID) {
auto info = td::move_tl_object_as<td::td_api::updateFileGenerationStart>(update->object);
generate_file(info->generation_id_, info->original_path_, info->destination_path_, info->conversion_);
} else if (update->object->get_id() == td::td_api::updateFile::ID) {
auto file = td::move_tl_object_as<td::td_api::updateFile>(update->object);
LOG(INFO) << to_string(file);
}
}
void one_file() {
LOG(ERROR) << "Start one_file test";
auto file_path = PSTRING() << "test_documents" << TD_DIR_SLASH << "a.txt";
td::mkpath(file_path).ensure();
auto raw_file =
td::FileFd::open(file_path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
.move_as_ok();
auto file = td::BufferedFd<td::FileFd>(std::move(raw_file));
for (int i = 1; i < 100000; i++) {
file.write(PSLICE() << i << "\n").ensure();
}
file.flush_write().ensure(); // important
file.close();
send_query(td::make_tl_object<td::td_api::sendMessage>(
chat_id_, 0, nullptr, nullptr, nullptr,
td::make_tl_object<td::td_api::inputMessageDocument>(
td::make_tl_object<td::td_api::inputFileGenerated>(file_path, "square", 0),
td::make_tl_object<td::td_api::inputThumbnail>(
td::make_tl_object<td::td_api::inputFileGenerated>(file_path, "thumbnail", 0), 0, 0),
true, td::make_tl_object<td::td_api::formattedText>(tag_, td::Auto()))),
[](auto res) { check_td_error(res); });
send_query(td::make_tl_object<td::td_api::sendMessage>(
chat_id_, 0, nullptr, nullptr, nullptr,
td::make_tl_object<td::td_api::inputMessageDocument>(
td::make_tl_object<td::td_api::inputFileGenerated>(file_path, "square", 0), nullptr, true,
td::make_tl_object<td::td_api::formattedText>(tag_, td::Auto()))),
[](auto res) { check_td_error(res); });
}
class GenerateFile final : public td::Actor {
public:
GenerateFile(TestClinetTask *parent, td::int64 id, td::string original_path, td::string destination_path,
td::string conversion)
: parent_(parent)
, id_(id)
, original_path_(std::move(original_path))
, destination_path_(std::move(destination_path))
, conversion_(std::move(conversion)) {
}
private:
TestClinetTask *parent_;
td::int64 id_;
td::string original_path_;
td::string destination_path_;
td::string conversion_;
FILE *from = nullptr;
FILE *to = nullptr;
void start_up() final {
from = std::fopen(original_path_.c_str(), "rb");
CHECK(from);
to = std::fopen(destination_path_.c_str(), "wb");
CHECK(to);
yield();
}
void loop() final {
int cnt = 0;
while (true) {
td::uint32 x;
auto r = std::fscanf(from, "%u", &x);
if (r != 1) {
return stop();
}
std::fprintf(to, "%u\n", x * x);
if (++cnt >= 10000) {
break;
}
}
auto ready = std::ftell(to);
LOG(ERROR) << "Ready: " << ready;
parent_->send_query(td::make_tl_object<td::td_api::setFileGenerationProgress>(
id_, 1039823 /*yeah, exact size of this file*/, td::narrow_cast<td::int32>(ready)),
[](auto result) { check_td_error(result); });
set_timeout_in(0.02);
}
void tear_down() final {
std::fclose(from);
std::fclose(to);
parent_->send_query(td::make_tl_object<td::td_api::finishFileGeneration>(id_, nullptr),
[](auto result) { check_td_error(result); });
}
};
void generate_file(td::int64 id, const td::string &original_path, const td::string &destination_path,
const td::string &conversion) {
LOG(ERROR) << "Generate file " << td::tag("id", id) << td::tag("original_path", original_path)
<< td::tag("destination_path", destination_path) << td::tag("conversion", conversion);
if (conversion == "square") {
td::create_actor<GenerateFile>("GenerateFile", this, id, original_path, destination_path, conversion).release();
} else if (conversion == "thumbnail") {
td::write_file(destination_path, td::base64url_decode(td::Slice(thumbnail, thumbnail_size)).ok()).ensure();
send_query(td::make_tl_object<td::td_api::finishFileGeneration>(id, nullptr),
[](auto result) { check_td_error(result); });
} else {
LOG(FATAL) << "Unknown " << td::tag("conversion", conversion);
}
}
private:
td::string tag_;
td::string username_;
td::int64 chat_id_ = 0;
};
class CheckTestC final : public TestClinetTask {
public:
CheckTestC(td::string username, td::string tag, td::Promise<> promise)
: username_(std::move(username)), tag_(std::move(tag)), promise_(std::move(promise)) {
}
void start_up() final {
send_query(td::make_tl_object<td::td_api::searchPublicChat>(username_), [this](auto res) {
CHECK(res->get_id() == td::td_api::chat::ID);
auto chat = td::move_tl_object_as<td::td_api::chat>(res);
chat_id_ = chat->id_;
this->one_file();
});
}
private:
td::string username_;
td::string tag_;
td::Promise<> promise_;
td::int64 chat_id_ = 0;
void one_file() {
send_query(td::make_tl_object<td::td_api::sendMessage>(
chat_id_, 0, nullptr, nullptr, nullptr,
td::make_tl_object<td::td_api::inputMessageText>(
td::make_tl_object<td::td_api::formattedText>(PSTRING() << tag_ << " ONE_FILE", td::Auto()),
nullptr, false)),
[](auto res) { check_td_error(res); });
}
void process_update(std::shared_ptr<TestClient::Update> update) final {
if (!update->object) {
return;
}
if (update->object->get_id() == td::td_api::updateNewMessage::ID) {
auto updateNewMessage = td::move_tl_object_as<td::td_api::updateNewMessage>(update->object);
auto &message = updateNewMessage->message_;
if (message->content_->get_id() == td::td_api::messageDocument::ID) {
auto messageDocument = td::move_tl_object_as<td::td_api::messageDocument>(message->content_);
auto text = messageDocument->caption_->text_;
if (text.substr(0, tag_.size()) == tag_) {
file_id_to_check_ = messageDocument->document_->document_->id_;
LOG(ERROR) << "Receive file " << to_string(messageDocument->document_->document_);
send_query(td::make_tl_object<td::td_api::downloadFile>(file_id_to_check_, 1, 0, 0, false),
[](auto res) { check_td_error(res); });
}
}
} else if (update->object->get_id() == td::td_api::updateFile::ID) {
auto updateFile = td::move_tl_object_as<td::td_api::updateFile>(update->object);
if (updateFile->file_->id_ == file_id_to_check_ && (updateFile->file_->local_->is_downloading_completed_)) {
check_file(updateFile->file_->local_->path_);
}
}
}
void check_file(td::CSlice path) {
FILE *from = std::fopen(path.c_str(), "rb");
CHECK(from);
td::uint32 x;
td::uint32 y = 1;
while (std::fscanf(from, "%u", &x) == 1) {
CHECK(x == y * y);
y++;
}
std::fclose(from);
stop();
}
td::int32 file_id_to_check_ = 0;
};
class LoginTestActor final : public td::Actor {
public:
explicit LoginTestActor(td::Status *status) : status_(status) {
*status_ = td::Status::OK();
}
private:
td::Status *status_;
td::ActorOwn<TestClient> alice_;
td::ActorOwn<TestClient> bob_;
td::string alice_phone_ = "9996636437";
td::string bob_phone_ = "9996636438";
td::string alice_username_ = "alice_" + alice_phone_;
td::string bob_username_ = "bob_" + bob_phone_;
td::string stage_name_;
void begin_stage(td::string stage_name, double timeout) {
LOG(WARNING) << "Begin stage '" << stage_name << "'";
stage_name_ = std::move(stage_name);
set_timeout_in(timeout);
}
void start_up() final {
begin_stage("Logging in", 160);
alice_ = td::create_actor<TestClient>("AliceClient", "alice");
bob_ = td::create_actor<TestClient>("BobClient", "bob");
td::send_closure(alice_, &TestClient::add_listener,
td::make_unique<DoAuthentication>(
"alice", alice_phone_, "33333",
td::create_event_promise(self_closure(this, &LoginTestActor::start_up_fence_dec))));
td::send_closure(bob_, &TestClient::add_listener,
td::make_unique<DoAuthentication>(
"bob", bob_phone_, "33333",
td::create_event_promise(self_closure(this, &LoginTestActor::start_up_fence_dec))));
}
int start_up_fence_ = 3;
void start_up_fence_dec() {
--start_up_fence_;
if (start_up_fence_ == 0) {
init();
} else if (start_up_fence_ == 1) {
return init();
class WaitActor final : public td::Actor {
public:
WaitActor(double timeout, td::Promise<> promise) : timeout_(timeout), promise_(std::move(promise)) {
}
void start_up() final {
set_timeout_in(timeout_);
}
void timeout_expired() final {
stop();
}
private:
double timeout_;
td::Promise<> promise_;
};
td::create_actor<WaitActor>("WaitActor", 2,
td::create_event_promise(self_closure(this, &LoginTestActor::start_up_fence_dec)))
.release();
}
}
void init() {
td::send_closure(alice_, &TestClient::add_listener,
td::make_unique<SetUsername>(alice_username_, td::create_event_promise(self_closure(
this, &LoginTestActor::init_fence_dec))));
td::send_closure(bob_, &TestClient::add_listener,
td::make_unique<SetUsername>(
bob_username_, td::create_event_promise(self_closure(this, &LoginTestActor::init_fence_dec))));
}
int init_fence_ = 2;
void init_fence_dec() {
if (--init_fence_ == 0) {
test_a();
}
}
int test_a_fence_ = 2;
void test_a_fence() {
if (--test_a_fence_ == 0) {
test_b();
}
}
void test_a() {
begin_stage("Ready to create chats", 80);
td::string alice_tag = PSTRING() << td::format::as_hex(td::Random::secure_int64());
td::string bob_tag = PSTRING() << td::format::as_hex(td::Random::secure_int64());
td::send_closure(bob_, &TestClient::add_listener,
td::make_unique<CheckTestA>(
alice_tag, td::create_event_promise(self_closure(this, &LoginTestActor::test_a_fence))));
td::send_closure(alice_, &TestClient::add_listener,
td::make_unique<CheckTestA>(
bob_tag, td::create_event_promise(self_closure(this, &LoginTestActor::test_a_fence))));
td::send_closure(alice_, &TestClient::add_listener, td::make_unique<TestA>(alice_tag, bob_username_));
td::send_closure(bob_, &TestClient::add_listener, td::make_unique<TestA>(bob_tag, alice_username_));
// td::send_closure(alice_, &TestClient::add_listener, td::make_unique<TestChat>(bob_username_));
}
void timeout_expired() final {
LOG(FATAL) << "Timeout expired in stage '" << stage_name_ << "'";
}
int test_b_fence_ = 1;
void test_b_fence() {
if (--test_b_fence_ == 0) {
test_c();
}
}
int test_c_fence_ = 1;
void test_c_fence() {
if (--test_c_fence_ == 0) {
finish();
}
}
void test_b() {
begin_stage("Create secret chat", 40);
td::string tag = PSTRING() << td::format::as_hex(td::Random::secure_int64());
td::send_closure(
bob_, &TestClient::add_listener,
td::make_unique<CheckTestA>(tag, td::create_event_promise(self_closure(this, &LoginTestActor::test_b_fence))));
td::send_closure(alice_, &TestClient::add_listener, td::make_unique<TestSecretChat>(tag, bob_username_));
}
void test_c() {
begin_stage("Send generated file", 240);
td::string tag = PSTRING() << td::format::as_hex(td::Random::secure_int64());
td::send_closure(
bob_, &TestClient::add_listener,
td::make_unique<CheckTestC>(alice_username_, tag,
td::create_event_promise(self_closure(this, &LoginTestActor::test_c_fence))));
td::send_closure(alice_, &TestClient::add_listener, td::make_unique<TestFileGenerated>(tag, bob_username_));
}
int finish_fence_ = 2;
void finish_fence() {
finish_fence_--;
if (finish_fence_ == 0) {
td::Scheduler::instance()->finish();
stop();
}
}
void finish() {
td::send_closure(alice_, &TestClient::close,
td::create_event_promise(self_closure(this, &LoginTestActor::finish_fence)));
td::send_closure(bob_, &TestClient::close,
td::create_event_promise(self_closure(this, &LoginTestActor::finish_fence)));
}
};
class Tdclient_login final : public td::Test {
public:
using Test::Test;
bool step() final {
if (!is_inited_) {
sched_.create_actor_unsafe<LoginTestActor>(0, "LoginTestActor", &result_).release();
sched_.start();
is_inited_ = true;
}
bool ret = sched_.run_main(10);
if (ret) {
return true;
}
sched_.finish();
if (result_.is_error()) {
LOG(ERROR) << result_;
}
ASSERT_TRUE(result_.is_ok());
return false;
}
private:
bool is_inited_ = false;
td::ConcurrentScheduler sched_{4, 0};
td::Status result_;
};
//RegisterTest<Tdclient_login> Tdclient_login("Tdclient_login");
TEST(Client, Simple) {
td::Client client;
// client.execute({1, td::td_api::make_object<td::td_api::setLogTagVerbosityLevel>("actor", 1)});
client.send({3, td::make_tl_object<td::td_api::testSquareInt>(3)});
while (true) {
auto result = client.receive(10);
if (result.id == 3) {
auto test_int = td::td_api::move_object_as<td::td_api::testInt>(result.object);
ASSERT_EQ(test_int->value_, 9);
break;
}
}
}
TEST(Client, SimpleMulti) {
std::vector<td::Client> clients(7);
//for (auto &client : clients) {
//client.execute({1, td::td_api::make_object<td::td_api::setLogTagVerbosityLevel>("td_requests", 1)});
//}
for (size_t i = 0; i < clients.size(); i++) {
clients[i].send({i + 2, td::make_tl_object<td::td_api::testSquareInt>(3)});
if (td::Random::fast_bool()) {
clients[i].send({1, td::make_tl_object<td::td_api::close>()});
}
}
for (size_t i = 0; i < clients.size(); i++) {
while (true) {
auto result = clients[i].receive(10);
if (result.id == i + 2) {
CHECK(result.object->get_id() == td::td_api::testInt::ID);
auto test_int = td::td_api::move_object_as<td::td_api::testInt>(result.object);
ASSERT_EQ(test_int->value_, 9);
break;
}
}
}
}
#if !TD_THREAD_UNSUPPORTED
TEST(Client, Multi) {
td::vector<td::thread> threads;
std::atomic<int> ok_count{0};
for (int i = 0; i < 4; i++) {
threads.emplace_back([i, &ok_count] {
for (int j = 0; j < 1000; j++) {
td::Client client;
auto request_id = static_cast<td::uint64>(j + 2 + 1000 * i);
client.send({request_id, td::make_tl_object<td::td_api::testSquareInt>(3)});
if (j & 1) {
client.send({1, td::make_tl_object<td::td_api::close>()});
}
while (true) {
auto result = client.receive(10);
if (result.id == request_id) {
ok_count++;
if ((j & 1) == 0) {
client.send({1, td::make_tl_object<td::td_api::close>()});
}
}
if (result.id == 0 && result.object != nullptr &&
result.object->get_id() == td::td_api::updateAuthorizationState::ID &&
static_cast<const td::td_api::updateAuthorizationState *>(result.object.get())
->authorization_state_->get_id() == td::td_api::authorizationStateClosed::ID) {
ok_count++;
break;
}
}
}
});
}
for (auto &thread : threads) {
thread.join();
}
ASSERT_EQ(8 * 1000, ok_count.load());
}
TEST(Client, Manager) {
td::vector<td::thread> threads;
td::ClientManager client;
#if !TD_EVENTFD_UNSUPPORTED // Client must be used from a single thread if there is no EventFd
int threads_n = 4;
#else
int threads_n = 1;
#endif
int clients_n = 1000;
client.send(0, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
client.send(-1, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
for (int i = 0; i < threads_n; i++) {
threads.emplace_back([&] {
for (int i = 0; i <= clients_n; i++) {
auto id = client.create_client_id();
if (i != 0) {
client.send(id, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
}
}
});
}
for (auto &thread : threads) {
thread.join();
}
std::set<td::int32> ids;
while (ids.size() != static_cast<size_t>(threads_n) * clients_n) {
auto event = client.receive(10);
if (event.client_id == 0 || event.client_id == -1) {
ASSERT_EQ(td::td_api::error::ID, event.object->get_id());
ASSERT_EQ(400, static_cast<td::td_api::error &>(*event.object).code_);
continue;
}
if (event.request_id == 3) {
ASSERT_EQ(td::td_api::testInt::ID, event.object->get_id());
ASSERT_TRUE(ids.insert(event.client_id).second);
}
}
}
#if !TD_EVENTFD_UNSUPPORTED // Client must be used from a single thread if there is no EventFd
TEST(Client, Close) {
std::atomic<bool> stop_send{false};
std::atomic<bool> can_stop_receive{false};
std::atomic<td::int64> send_count{1};
std::atomic<td::int64> receive_count{0};
td::Client client;
std::mutex request_ids_mutex;
std::set<td::uint64> request_ids;
request_ids.insert(1);
td::thread send_thread([&] {
td::uint64 request_id = 2;
while (!stop_send.load()) {
{
std::unique_lock<std::mutex> guard(request_ids_mutex);
request_ids.insert(request_id);
}
client.send({request_id++, td::make_tl_object<td::td_api::testSquareInt>(3)});
send_count++;
}
can_stop_receive = true;
});
td::thread receive_thread([&] {
auto max_continue_send = td::Random::fast_bool() ? 0 : 1000;
while (true) {
auto response = client.receive(10.0);
if (response.object == nullptr) {
if (!stop_send) {
stop_send = true;
} else {
return;
}
}
if (response.id > 0) {
if (!stop_send && response.object->get_id() == td::td_api::error::ID &&
static_cast<td::td_api::error &>(*response.object).code_ == 500 &&
td::Random::fast(0, max_continue_send) == 0) {
stop_send = true;
}
receive_count++;
{
std::unique_lock<std::mutex> guard(request_ids_mutex);
size_t erased_count = request_ids.erase(response.id);
CHECK(erased_count > 0);
}
}
if (can_stop_receive && receive_count == send_count) {
break;
}
}
});
td::usleep_for((td::Random::fast_bool() ? 0 : 1000) * (td::Random::fast_bool() ? 1 : 50));
client.send({1, td::make_tl_object<td::td_api::close>()});
send_thread.join();
receive_thread.join();
ASSERT_EQ(send_count.load(), receive_count.load());
ASSERT_TRUE(request_ids.empty());
}
TEST(Client, ManagerClose) {
std::atomic<bool> stop_send{false};
std::atomic<bool> can_stop_receive{false};
std::atomic<td::int64> send_count{1};
std::atomic<td::int64> receive_count{0};
td::ClientManager client_manager;
auto client_id = client_manager.create_client_id();
std::mutex request_ids_mutex;
std::set<td::uint64> request_ids;
request_ids.insert(1);
td::thread send_thread([&] {
td::uint64 request_id = 2;
while (!stop_send.load()) {
{
std::unique_lock<std::mutex> guard(request_ids_mutex);
request_ids.insert(request_id);
}
client_manager.send(client_id, request_id++, td::make_tl_object<td::td_api::testSquareInt>(3));
send_count++;
}
can_stop_receive = true;
});
td::thread receive_thread([&] {
auto max_continue_send = td::Random::fast_bool() ? 0 : 1000;
bool can_stop_send = false;
while (true) {
auto response = client_manager.receive(10.0);
if (response.object == nullptr) {
if (!stop_send) {
can_stop_send = true;
} else {
return;
}
}
if (can_stop_send && max_continue_send-- <= 0) {
stop_send = true;
}
if (response.request_id > 0) {
receive_count++;
{
std::unique_lock<std::mutex> guard(request_ids_mutex);
size_t erased_count = request_ids.erase(response.request_id);
CHECK(erased_count > 0);
}
}
if (can_stop_receive && receive_count == send_count) {
break;
}
}
});
td::usleep_for((td::Random::fast_bool() ? 0 : 1000) * (td::Random::fast_bool() ? 1 : 50));
client_manager.send(client_id, 1, td::make_tl_object<td::td_api::close>());
send_thread.join();
receive_thread.join();
ASSERT_EQ(send_count.load(), receive_count.load());
ASSERT_TRUE(request_ids.empty());
}
#endif
#endif
TEST(Client, ManagerCloseOneThread) {
td::ClientManager client_manager;
td::uint64 request_id = 2;
std::map<td::uint64, td::int32> sent_requests;
td::uint64 sent_count = 0;
td::uint64 receive_count = 0;
auto send_request = [&](td::int32 client_id, td::int32 expected_error_code) {
sent_count++;
sent_requests.emplace(request_id, expected_error_code);
client_manager.send(client_id, request_id++, td::make_tl_object<td::td_api::testSquareInt>(3));
};
auto receive = [&] {
while (receive_count != sent_count) {
auto response = client_manager.receive(1.0);
if (response.object == nullptr) {
continue;
}
if (response.request_id > 0) {
receive_count++;
auto it = sent_requests.find(response.request_id);
CHECK(it != sent_requests.end());
auto expected_error_code = it->second;
sent_requests.erase(it);
if (expected_error_code == 0) {
if (response.request_id == 1) {
ASSERT_EQ(td::td_api::ok::ID, response.object->get_id());
} else {
ASSERT_EQ(td::td_api::testInt::ID, response.object->get_id());
}
} else {
ASSERT_EQ(td::td_api::error::ID, response.object->get_id());
ASSERT_EQ(expected_error_code, static_cast<td::td_api::error &>(*response.object).code_);
}
}
}
};
for (int t = 0; t < 3; t++) {
for (td::int32 i = -5; i <= 0; i++) {
send_request(i, 400);
}
receive();
auto client_id = client_manager.create_client_id();
for (td::int32 i = -5; i < 5; i++) {
send_request(i, i == client_id ? 0 : (i > 0 && i < client_id ? 500 : 400));
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 0);
}
receive();
sent_count++;
sent_requests.emplace(1, 0);
client_manager.send(client_id, 1, td::make_tl_object<td::td_api::close>());
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
}
ASSERT_TRUE(sent_requests.empty());
}
TEST(PartsManager, hands) {
{
td::PartsManager pm;
pm.init(0, 100000, false, 10, {0, 1, 2}, false, true).ensure_error();
//pm.set_known_prefix(0, false).ensure();
}
{
td::PartsManager pm;
pm.init(1, 100000, true, 10, {0, 1, 2}, false, true).ensure_error();
}
}