Merge commit '3fc140b4a260c606e5c6d79ef148d4beeb37048a'

Conflicts:
	td/telegram/Client.cpp
	td/telegram/Client.h
This commit is contained in:
Andrea Cavalli 2020-10-12 15:41:34 +02:00
commit ccaec7e042
6 changed files with 167 additions and 67 deletions

View File

@ -47,10 +47,11 @@ class MultiTd : public Actor {
set_tag(old_tag); set_tag(old_tag);
} }
void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) { void send(ClientManager::ClientId client_id, ClientManager::RequestId request_id,
td_api::object_ptr<td_api::Function> &&request) {
auto &td = tds_[client_id]; auto &td = tds_[client_id];
CHECK(!td.empty()); CHECK(!td.empty());
send_closure(td, &Td::request, request_id, std::move(function)); send_closure(td, &Td::request, request_id, std::move(request));
} }
void close(int32 td_id) { void close(int32 td_id) {
@ -66,11 +67,11 @@ class MultiTd : public Actor {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
class TdReceiver { class TdReceiver {
public: public:
MultiClient::Response receive(double timeout) { ClientManager::Response receive(double timeout) {
return receive(timeout, true, true); return receive(timeout, true, true);
} }
MultiClient::Response receive(double timeout, bool include_responses, bool include_updates) { ClientManager::Response receive(double timeout, bool include_responses, bool include_updates) {
if (include_responses && !responses_.empty()) { if (include_responses && !responses_.empty()) {
auto result = std::move(responses_.front()); auto result = std::move(responses_.front());
responses_.pop(); responses_.pop();
@ -84,10 +85,10 @@ class TdReceiver {
return {0, 0, nullptr}; return {0, 0, nullptr};
} }
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) { unique_ptr<TdCallback> create_callback(ClientManager::ClientId client_id) {
class Callback : public TdCallback { class Callback : public TdCallback {
public: public:
Callback(MultiClient::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) { Callback(ClientManager::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) {
} }
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override { void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
if (id == 0) { if (id == 0) {
@ -117,18 +118,18 @@ class TdReceiver {
} }
private: private:
MultiClient::ClientId client_id_; ClientManager::ClientId client_id_;
TdReceiver *impl_; TdReceiver *impl_;
}; };
return td::make_unique<Callback>(client_id, this); return td::make_unique<Callback>(client_id, this);
} }
private: private:
std::queue<MultiClient::Response> updates_; std::queue<ClientManager::Response> updates_;
std::queue<MultiClient::Response> responses_; std::queue<ClientManager::Response> responses_;
}; };
class MultiClient::Impl final { class ClientManager::Impl final {
public: public:
Impl() { Impl() {
options_.net_query_stats = std::make_shared<NetQueryStats>(); options_.net_query_stats = std::make_shared<NetQueryStats>();
@ -145,11 +146,11 @@ class MultiClient::Impl final {
return client_id; return client_id;
} }
void send(ClientId client_id, RequestId request_id, Function function) { void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) {
Request request; Request request;
request.client_id = client_id; request.client_id = client_id;
request.id = request_id; request.id = request_id;
request.function = std::move(function); request.request = std::move(request);
requests_.push_back(std::move(request)); requests_.push_back(std::move(request));
} }
@ -163,7 +164,7 @@ class MultiClient::Impl final {
for (auto &request : requests_) { for (auto &request : requests_) {
auto &td = tds_[request.client_id]; auto &td = tds_[request.client_id];
CHECK(!td.empty()); CHECK(!td.empty());
send_closure_later(td, &Td::request, request.id, std::move(request.function)); send_closure_later(td, &Td::request, request.id, std::move(request.request));
} }
requests_.clear(); requests_.clear();
} }
@ -204,9 +205,9 @@ class MultiClient::Impl final {
struct Request { struct Request {
ClientId client_id; ClientId client_id;
RequestId id; RequestId id;
Function function; td_api::object_ptr<td_api::Function> request;
}; };
std::vector<Request> requests_; td::vector<Request> requests_;
unique_ptr<ConcurrentScheduler> concurrent_scheduler_; unique_ptr<ConcurrentScheduler> concurrent_scheduler_;
ClientId client_id_{0}; ClientId client_id_{0};
Td::Options options_; Td::Options options_;
@ -220,7 +221,7 @@ class Client::Impl final {
} }
void send(Request request) { void send(Request request) {
impl_.send(client_id_, request.id, std::move(request.function)); impl_.send(client_id_, request.id, std::move(request.request));
} }
Response receive(double timeout) { Response receive(double timeout) {
@ -236,8 +237,8 @@ class Client::Impl final {
} }
private: private:
MultiClient::Impl impl_; ClientManager::Impl impl_;
MultiClient::ClientId client_id_; ClientManager::ClientId client_id_;
}; };
#else #else
@ -251,11 +252,11 @@ class TdReceiver {
output_updates_queue_->init(); output_updates_queue_->init();
} }
MultiClient::Response receive(double timeout) { ClientManager::Response receive(double timeout) {
return receive(timeout, true, true); return receive(timeout, true, true);
} }
MultiClient::Response receive(double timeout, bool include_responses, bool include_updates) { ClientManager::Response receive(double timeout, bool include_responses, bool include_updates) {
VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout; VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout;
bool is_responses_locked = false; bool is_responses_locked = false;
bool is_updates_locked = false; bool is_updates_locked = false;
@ -276,14 +277,14 @@ class TdReceiver {
is_responses_locked = receive_responses_lock_.exchange(false); is_responses_locked = receive_responses_lock_.exchange(false);
CHECK(is_responses_locked); CHECK(is_responses_locked);
} }
VLOG(td_requests) << "End to wait for updates, returning object " << response.id << ' ' << response.object.get(); VLOG(td_requests) << "End to wait for updates, returning object " << response.request_id << ' ' << response.object.get();
return response; return response;
} }
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) { unique_ptr<TdCallback> create_callback(ClientManager::ClientId client_id) {
class Callback : public TdCallback { class Callback : public TdCallback {
public: public:
explicit Callback(MultiClient::ClientId client_id, std::shared_ptr<OutputQueue> output_responses_queue, std::shared_ptr<OutputQueue> output_updates_queue) explicit Callback(ClientManager::ClientId client_id, std::shared_ptr<OutputQueue> output_responses_queue, std::shared_ptr<OutputQueue> output_updates_queue)
: client_id_(client_id), output_responses_queue_(std::move(output_responses_queue)), output_updates_queue_(std::move(output_updates_queue)) { : client_id_(client_id), output_responses_queue_(std::move(output_responses_queue)), output_updates_queue_(std::move(output_updates_queue)) {
} }
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override { void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
@ -314,7 +315,7 @@ class TdReceiver {
} }
private: private:
MultiClient::ClientId client_id_; ClientManager::ClientId client_id_;
std::shared_ptr<OutputQueue> output_responses_queue_; std::shared_ptr<OutputQueue> output_responses_queue_;
std::shared_ptr<OutputQueue> output_updates_queue_; std::shared_ptr<OutputQueue> output_updates_queue_;
}; };
@ -322,7 +323,7 @@ class TdReceiver {
} }
private: private:
using OutputQueue = MpscPollableQueue<MultiClient::Response>; using OutputQueue = MpscPollableQueue<ClientManager::Response>;
std::shared_ptr<OutputQueue> output_responses_queue_; std::shared_ptr<OutputQueue> output_responses_queue_;
std::shared_ptr<OutputQueue> output_updates_queue_; std::shared_ptr<OutputQueue> output_updates_queue_;
int output_responses_queue_ready_cnt_{0}; int output_responses_queue_ready_cnt_{0};
@ -330,7 +331,7 @@ class TdReceiver {
std::atomic<bool> receive_responses_lock_{false}; std::atomic<bool> receive_responses_lock_{false};
std::atomic<bool> receive_updates_lock_{false}; std::atomic<bool> receive_updates_lock_{false};
MultiClient::Response receive_unlocked(double timeout, bool include_responses, bool include_updates) { ClientManager::Response receive_unlocked(double timeout, bool include_responses, bool include_updates) {
if (include_responses) { if (include_responses) {
if (output_responses_queue_ready_cnt_ == 0) { if (output_responses_queue_ready_cnt_ == 0) {
output_responses_queue_ready_cnt_ = output_responses_queue_->reader_wait_nonblock(); output_responses_queue_ready_cnt_ = output_responses_queue_->reader_wait_nonblock();
@ -395,9 +396,10 @@ class MultiImpl {
return id; return id;
} }
void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) { void send(ClientManager::ClientId client_id, ClientManager::RequestId request_id,
td_api::object_ptr<td_api::Function> &&request) {
auto guard = concurrent_scheduler_->get_send_guard(); auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::send, client_id, request_id, std::move(function)); send_closure(multi_td_, &MultiTd::send, client_id, request_id, std::move(request));
} }
void close(int32 td_id) { void close(int32 td_id) {
@ -456,7 +458,7 @@ class MultiImplPool {
std::shared_ptr<NetQueryStats> net_query_stats_ = std::make_shared<NetQueryStats>(); std::shared_ptr<NetQueryStats> net_query_stats_ = std::make_shared<NetQueryStats>();
}; };
class MultiClient::Impl final { class ClientManager::Impl final {
public: public:
ClientId create_client() { ClientId create_client() {
auto impl = pool_.get(); auto impl = pool_.get();
@ -468,11 +470,11 @@ class MultiClient::Impl final {
return client_id; return client_id;
} }
void send(ClientId client_id, RequestId request_id, Function function) { void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) {
auto lock = impls_mutex_.lock_read().move_as_ok(); auto lock = impls_mutex_.lock_read().move_as_ok();
auto it = impls_.find(client_id); auto it = impls_.find(client_id);
CHECK(it != impls_.end()); CHECK(it != impls_.end());
it->second->send(client_id, request_id, std::move(function)); it->second->send(client_id, request_id, std::move(request));
} }
Response receive(double timeout) { Response receive(double timeout) {
@ -539,7 +541,7 @@ class Client::Impl final {
} }
Client::Response old_res; Client::Response old_res;
old_res.id = res.id; old_res.id = res.request_id;
old_res.object = std::move(res.object); old_res.object = std::move(res.object);
return old_res; return old_res;
} }
@ -590,31 +592,31 @@ Client::~Client() = default;
Client::Client(Client &&other) = default; Client::Client(Client &&other) = default;
Client &Client::operator=(Client &&other) = default; Client &Client::operator=(Client &&other) = default;
MultiClient::MultiClient() : impl_(std::make_unique<Impl>()) { ClientManager::ClientManager() : impl_(std::make_unique<Impl>()) {
} }
MultiClient::ClientId MultiClient::create_client() { ClientManager::ClientId ClientManager::create_client() {
return impl_->create_client(); return impl_->create_client();
} }
void MultiClient::send(ClientId client_id, RequestId request_id, Function &&function) { void ClientManager::send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) {
impl_->send(client_id, request_id, std::move(function)); impl_->send(client_id, request_id, std::move(request));
} }
MultiClient::Response MultiClient::receive(double timeout) { ClientManager::Response ClientManager::receive(double timeout) {
return impl_->receive(timeout); return impl_->receive(timeout);
} }
MultiClient::Response MultiClient::receive(double timeout, bool include_responses, bool include_updates) { ClientManager::Response ClientManager::receive(double timeout, bool include_responses, bool include_updates) {
return impl_->receive(timeout, include_responses, include_updates); return impl_->receive(timeout, include_responses, include_updates);
} }
MultiClient::Object MultiClient::execute(Function &&function) { td_api::object_ptr<td_api::Object> ClientManager::execute(td_api::object_ptr<td_api::Function> &&request) {
return Td::static_request(std::move(function)); return Td::static_request(std::move(request));
} }
MultiClient::~MultiClient() = default; ClientManager::~ClientManager() = default;
MultiClient::MultiClient(MultiClient &&other) = default; ClientManager::ClientManager(ClientManager &&other) = default;
MultiClient &MultiClient::operator=(MultiClient &&other) = default; ClientManager &ClientManager::operator=(ClientManager &&other) = default;
} // namespace td } // namespace td

View File

@ -22,7 +22,7 @@ namespace td {
* The TDLib instance is created for the lifetime of the Client object. * The TDLib instance is created for the lifetime of the Client object.
* Requests to TDLib can be sent using the Client::send method from any thread. * Requests to TDLib can be sent using the Client::send method from any thread.
* New updates and responses to requests can be received using the Client::receive method from any thread, * New updates and responses to requests can be received using the Client::receive method from any thread,
* this function shouldn't be called simultaneously from two different threads. Also note that all updates and * this function must not be called simultaneously from two different threads. Also note that all updates and
* responses to requests should be applied in the same order as they were received, to ensure consistency. * responses to requests should be applied in the same order as they were received, to ensure consistency.
* Given this information, it's advisable to call this function from a dedicated thread. * Given this information, it's advisable to call this function from a dedicated thread.
* Some service TDLib requests can be executed synchronously from any thread by using the Client::execute method. * Some service TDLib requests can be executed synchronously from any thread by using the Client::execute method.
@ -84,7 +84,7 @@ class Client final {
*/ */
struct Response { struct Response {
/** /**
* TDLib request identifier, which corresponds to the response or 0 for incoming updates from TDLib. * TDLib request identifier, which corresponds to the response, or 0 for incoming updates from TDLib.
*/ */
std::uint64_t id; std::uint64_t id;
@ -142,36 +142,123 @@ class Client final {
std::unique_ptr<Impl> impl_; std::unique_ptr<Impl> impl_;
}; };
// --- EXPERIMENTAL --- /**
class MultiClient final { * The future native C++ interface for interaction with TDLib.
*
* The TDLib client instance is created using the ClientManager::create_client method, returning a client identifier.
* Requests to TDLib can be sent using the ClientManager::send method from any thread.
* New updates and responses to requests can be received using the ClientManager::receive method from any thread,
* this function must not be called simultaneously from two different threads. Also note that all updates and
* responses to requests should be applied in the same order as they were received, to ensure consistency.
* Some TDLib requests can be executed synchronously from any thread by using the ClientManager::execute method.
*
* General pattern of usage:
* \code
* td::ClientManager manager;
* auto client_id = manager.create_client();
* // somehow share the manager and the client_id with other threads,
* // which will be able to send requests via manager.send(client_id, ...)
*
* const double WAIT_TIMEOUT = 10.0; // seconds
* while (true) {
* auto response = manager.receive(WAIT_TIMEOUT);
* if (response.object == nullptr) {
* continue;
* }
*
* if (response.id == 0) {
* // process response.object as an incoming update of type td_api::Update for the client response.client_id
* } else {
* // process response.object as an answer to a request response.request_id for the client response.client_id
* }
* }
* \endcode
*/
class ClientManager final {
public: public:
MultiClient(); /**
* Creates a new TDLib client manager.
*/
ClientManager();
/**
* Opaque TDLib client instance identifier.
*/
using ClientId = std::int32_t; using ClientId = std::int32_t;
using RequestId = std::uint64_t;
using Function = td_api::object_ptr<td_api::Function>;
using Object = td_api::object_ptr<td_api::Object>;
struct Response {
ClientId client_id;
RequestId id;
Object object;
};
/**
* Request identifier.
* Responses to TDLib requests will have the same request id as the corresponding request.
* Updates from TDLib will have request id == 0, incoming requests are thus disallowed to have request id == 0.
*/
using RequestId = std::uint64_t;
/**
* Creates a new TDLib client and returns its opaque identifier.
*/
ClientId create_client(); ClientId create_client();
void send(ClientId client_id, RequestId request_id, Function &&function); /**
* Sends request to TDLib. May be called from any thread.
* \param[in] client_id TDLib client instance identifier.
* \param[in] request_id Request identifier. Must be non-zero.
* \param[in] request Request to TDLib.
*/
void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request);
/**
* A response to a request, or an incoming update from TDLib.
*/
struct Response {
/**
* TDLib client instance identifier, for which the response is received.
*/
ClientId client_id;
/**
* Request identifier, to which the response corresponds, or 0 for incoming updates from TDLib.
*/
RequestId request_id;
/**
* TDLib API object representing a response to a TDLib request or an incoming update.
*/
td_api::object_ptr<td_api::Object> object;
};
/**
* Receives incoming updates and request responses from TDLib. May be called from any thread, but must not be
* called simultaneously from two different threads.
* \param[in] timeout The maximum number of seconds allowed for this function to wait for new data.
* \return An incoming update or request response. The object returned in the response may be a nullptr
* if the timeout expires.
*/
Response receive(double timeout); Response receive(double timeout);
Response receive(double timeout, bool include_responses, bool include_updates); Response receive(double timeout, bool include_responses, bool include_updates);
static Object execute(Function &&function); /**
* Synchronously executes TDLib requests. Only a few requests can be executed synchronously.
* May be called from any thread.
* \param[in] request Request to the TDLib.
* \return The request response.
*/
static td_api::object_ptr<td_api::Object> execute(td_api::object_ptr<td_api::Function> &&request);
~MultiClient(); /**
* Destroys the client manager and all TDLib client instance managed by it.
*/
~ClientManager();
MultiClient(MultiClient &&other); /**
* Move constructor.
*/
ClientManager(ClientManager &&other);
MultiClient &operator=(MultiClient &&other); /**
* Move assignment operator.
*/
ClientManager &operator=(ClientManager &&other);
private: private:
friend class Client; friend class Client;

View File

@ -7552,6 +7552,7 @@ ContactsManager::User *ContactsManager::get_user_force(UserId user_id) {
int32 profile_photo_local_id = 0; int32 profile_photo_local_id = 0;
int32 profile_photo_dc_id = 1; int32 profile_photo_dc_id = 1;
string first_name; string first_name;
string last_name;
string username; string username;
string phone_number; string phone_number;
int32 bot_info_version = 0; int32 bot_info_version = 0;
@ -7559,6 +7560,10 @@ ContactsManager::User *ContactsManager::get_user_force(UserId user_id) {
if (user_id == get_service_notifications_user_id()) { if (user_id == get_service_notifications_user_id()) {
flags |= telegram_api::user::PHONE_MASK | telegram_api::user::VERIFIED_MASK | telegram_api::user::SUPPORT_MASK; flags |= telegram_api::user::PHONE_MASK | telegram_api::user::VERIFIED_MASK | telegram_api::user::SUPPORT_MASK;
first_name = "Telegram"; first_name = "Telegram";
if (G()->is_test_dc()) {
flags |= telegram_api::user::LAST_NAME_MASK;
last_name = "Notifications";
}
phone_number = "42777"; phone_number = "42777";
profile_photo_id = 3337190045231023; profile_photo_id = 3337190045231023;
profile_photo_volume_id = 107738948; profile_photo_volume_id = 107738948;
@ -12346,8 +12351,9 @@ bool ContactsManager::get_user(UserId user_id, int left_tries, Promise<Unit> &&p
return false; return false;
} }
if (user_id == UserId(777000)) { if (user_id == get_service_notifications_user_id() || user_id == get_replies_bot_user_id() ||
get_user_force(user_id); // preload 777000 synchronously user_id == get_anonymous_bot_user_id()) {
get_user_force(user_id);
} }
// TODO support loading user from database and merging it with min-user in memory // TODO support loading user from database and merging it with min-user in memory

View File

@ -12557,7 +12557,11 @@ std::pair<DialogId, unique_ptr<MessagesManager::Message>> MessagesManager::creat
sender_user_id = UserId(); sender_user_id = UserId();
} }
if (!is_broadcast_channel(dialog_id) && td_->auth_manager_->is_bot()) { if (!is_broadcast_channel(dialog_id) && td_->auth_manager_->is_bot()) {
sender_user_id = td_->contacts_manager_->add_service_notifications_user(); if (dialog_id == sender_dialog_id) {
sender_user_id = td_->contacts_manager_->add_anonymous_bot_user();
} else {
sender_user_id = td_->contacts_manager_->add_service_notifications_user();
}
} }
} }
if (sender_dialog_id.is_valid()) { if (sender_dialog_id.is_valid()) {

View File

@ -498,6 +498,7 @@ application/x-dtbresource+xml res
application/x-dvi dvi application/x-dvi dvi
application/x-envoy evy application/x-envoy evy
application/x-eva eva application/x-eva eva
application/x-fictionbook+xml fb2
application/x-font-bdf bdf application/x-font-bdf bdf
application/x-font-ghostscript gsf application/x-font-ghostscript gsf
application/x-font-linux-psf psf application/x-font-linux-psf psf

View File

@ -917,9 +917,9 @@ TEST(Client, Multi) {
ASSERT_EQ(8 * 1000, ok_count.load()); ASSERT_EQ(8 * 1000, ok_count.load());
} }
TEST(Client, MultiNew) { TEST(Client, Manager) {
td::vector<td::thread> threads; td::vector<td::thread> threads;
td::MultiClient client; td::ClientManager client;
int threads_n = 4; int threads_n = 4;
int clients_n = 1000; int clients_n = 1000;
for (int i = 0; i < threads_n; i++) { for (int i = 0; i < threads_n; i++) {
@ -937,7 +937,7 @@ TEST(Client, MultiNew) {
std::set<int32> ids; std::set<int32> ids;
while (ids.size() != static_cast<size_t>(threads_n) * clients_n) { while (ids.size() != static_cast<size_t>(threads_n) * clients_n) {
auto event = client.receive(10); auto event = client.receive(10);
if (event.client_id != 0 && event.id == 3) { if (event.client_id != 0 && event.request_id == 3) {
ids.insert(event.client_id); ids.insert(event.client_id);
} }
} }