Merge commit '7207d76a809598d019e17fb04b2edc46789c6e22'

Conflicts:
	td/telegram/Client.cpp
This commit is contained in:
Andrea Cavalli 2020-10-12 16:12:59 +02:00
commit 0da9b62eda
12 changed files with 214 additions and 77 deletions

View File

@ -101,18 +101,18 @@ class TdReceiver {
class ClientManager::Impl final {
public:
Impl() {
options_.net_query_stats = std::make_shared<NetQueryStats>();
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0);
receiver_ = make_unique<TdReceiver>();
concurrent_scheduler_->start();
}
ClientId create_client() {
if (tds_.empty()) {
CHECK(concurrent_scheduler_ == nullptr);
CHECK(options_.net_query_stats == nullptr);
options_.net_query_stats = std::make_shared<NetQueryStats>();
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0);
concurrent_scheduler_->start();
}
auto client_id = ++client_id_;
tds_[client_id] =
concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_->create_callback(client_id), options_);
concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_.create_callback(client_id), options_);
return client_id;
}
@ -126,29 +126,31 @@ class ClientManager::Impl final {
Response receive(double timeout, bool include_responses, bool include_updates) {
if (!requests_.empty()) {
auto guard = concurrent_scheduler_->get_main_guard();
for (size_t i = 0; i < requests_.size(); i++) {
auto &request = requests_[i];
if (request.client_id <= 0 || request.client_id > client_id_) {
receiver_->add_response(request.client_id, request.id,
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
receiver_.add_response(request.client_id, request.id,
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
continue;
}
auto it = tds_.find(request.client_id);
if (it == tds_.end() || it->second.empty()) {
receiver_->add_response(request.client_id, request.id,
td_api::make_object<td_api::error>(500, "Request aborted"));
receiver_.add_response(request.client_id, request.id,
td_api::make_object<td_api::error>(500, "Request aborted"));
continue;
}
CHECK(concurrent_scheduler_ != nullptr);
auto guard = concurrent_scheduler_->get_main_guard();
send_closure_later(it->second, &Td::request, request.id, std::move(request.request));
}
requests_.clear();
}
auto response = receiver_->receive(0, include_responses, include_updates);
if (response.client_id == 0) {
auto response = receiver_.receive(0, include_responses, include_updates);
if (response.client_id == 0 && concurrent_scheduler_ != nullptr) {
concurrent_scheduler_->run_main(0);
response = receiver_->receive(0, include_responses, include_updates);
response = receiver_.receive(0, include_responses, include_updates);
} else {
ConcurrentScheduler::emscripten_clear_main_timeout();
}
@ -156,18 +158,32 @@ class ClientManager::Impl final {
response.object->get_id() == td_api::updateAuthorizationState::ID &&
static_cast<const td_api::updateAuthorizationState *>(response.object.get())->authorization_state_->get_id() ==
td_api::authorizationStateClosed::ID) {
CHECK(concurrent_scheduler_ != nullptr);
auto guard = concurrent_scheduler_->get_main_guard();
auto it = tds_.find(response.client_id);
CHECK(it != tds_.end());
it->second.reset();
response.client_id = 0;
response.object = nullptr;
}
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
auto guard = concurrent_scheduler_->get_main_guard();
auto it = tds_.find(response.client_id);
CHECK(it != tds_.end());
CHECK(it->second.empty());
tds_.erase(it);
response.client_id = 0;
response.object = td_api::make_object<td_api::updateAuthorizationState>(
td_api::make_object<td_api::authorizationStateClosed>());
if (tds_.empty()) {
CHECK(options_.net_query_stats.use_count() == 1);
CHECK(options_.net_query_stats->get_count() == 0);
options_.net_query_stats = nullptr;
concurrent_scheduler_->finish();
concurrent_scheduler_ = nullptr;
reset_to_empty(tds_);
}
}
return response;
}
@ -177,6 +193,10 @@ class ClientManager::Impl final {
Impl(Impl &&) = delete;
Impl &operator=(Impl &&) = delete;
~Impl() {
if (concurrent_scheduler_ == nullptr) {
return;
}
{
auto guard = concurrent_scheduler_->get_main_guard();
for (auto &td : tds_) {
@ -190,7 +210,7 @@ class ClientManager::Impl final {
}
private:
unique_ptr<TdReceiver> receiver_;
TdReceiver receiver_;
struct Request {
ClientId client_id;
RequestId id;
@ -485,6 +505,8 @@ class MultiImplPool {
init_openssl_threads();
impls_.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4);
net_query_stats_ = std::make_shared<NetQueryStats>();
}
auto &impl = *std::min_element(impls_.begin(), impls_.end(),
[](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); });
@ -496,17 +518,35 @@ class MultiImplPool {
return result;
}
void try_clear() {
std::unique_lock<std::mutex> lock(mutex_);
if (impls_.empty()) {
return;
}
for (auto &impl : impls_) {
if (impl.lock().use_count() != 0) {
return;
}
}
reset_to_empty(impls_);
CHECK(net_query_stats_.use_count() == 1);
CHECK(net_query_stats_->get_count() == 0);
net_query_stats_ = nullptr;
}
private:
std::mutex mutex_;
std::vector<std::weak_ptr<MultiImpl>> impls_;
std::shared_ptr<NetQueryStats> net_query_stats_ = std::make_shared<NetQueryStats>();
std::shared_ptr<NetQueryStats> net_query_stats_;
};
class ClientManager::Impl final {
public:
ClientId create_client() {
auto impl = pool_.get();
auto client_id = impl->create(*receiver_);
auto client_id = impl->create(receiver_);
{
auto lock = impls_mutex_.lock_write().move_as_ok();
impls_[client_id].impl = std::move(impl);
@ -517,13 +557,13 @@ class ClientManager::Impl final {
void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) {
auto lock = impls_mutex_.lock_read().move_as_ok();
if (!MultiImpl::is_valid_client_id(client_id)) {
receiver_->add_response(client_id, request_id,
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
receiver_.add_response(client_id, request_id,
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
return;
}
auto it = impls_.find(client_id);
if (it == impls_.end() || it->second.is_closed) {
receiver_->add_response(client_id, request_id, td_api::make_object<td_api::error>(500, "Request aborted"));
receiver_.add_response(client_id, request_id, td_api::make_object<td_api::error>(500, "Request aborted"));
return;
}
it->second.impl->send(client_id, request_id, std::move(request));
@ -534,13 +574,16 @@ class ClientManager::Impl final {
}
Response receive(double timeout, bool include_responses, bool include_updates) {
auto response = receiver_->receive(timeout, include_responses, include_updates);
auto response = receiver_.receive(timeout, include_responses, include_updates);
if (response.request_id == 0 && response.object != nullptr &&
response.object->get_id() == td_api::updateAuthorizationState::ID &&
static_cast<const td_api::updateAuthorizationState *>(response.object.get())->authorization_state_->get_id() ==
td_api::authorizationStateClosed::ID) {
auto lock = impls_mutex_.lock_write().move_as_ok();
close_impl(response.client_id);
response.client_id = 0;
response.object = nullptr;
}
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
auto lock = impls_mutex_.lock_write().move_as_ok();
@ -548,7 +591,14 @@ class ClientManager::Impl final {
CHECK(it != impls_.end());
CHECK(it->second.is_closed);
impls_.erase(it);
response.client_id = 0;
response.object = td_api::make_object<td_api::updateAuthorizationState>(
td_api::make_object<td_api::authorizationStateClosed>());
if (impls_.empty()) {
reset_to_empty(impls_);
pool_.try_clear();
}
}
return response;
}
@ -584,7 +634,7 @@ class ClientManager::Impl final {
bool is_closed = false;
};
std::unordered_map<ClientId, MultiImplInfo> impls_;
unique_ptr<TdReceiver> receiver_{make_unique<TdReceiver>()};
TdReceiver receiver_;
};
class Client::Impl final {
@ -592,8 +642,7 @@ class Client::Impl final {
Impl() {
static MultiImplPool pool;
multi_impl_ = pool.get();
receiver_ = make_unique<TdReceiver>();
td_id_ = multi_impl_->create(*receiver_);
td_id_ = multi_impl_->create(receiver_);
}
void send(Request request) {
@ -610,7 +659,7 @@ class Client::Impl final {
}
Response receive(double timeout, bool include_responses, bool include_updates) {
auto response = receiver_->receive(timeout, include_responses, include_updates);
auto response = receiver_.receive(timeout, include_responses, include_updates);
Response old_response;
old_response.id = response.request_id;
@ -625,7 +674,7 @@ class Client::Impl final {
~Impl() {
multi_impl_->close(td_id_);
while (true) {
auto response = receiver_->receive(10.0, false, true);
auto response = receiver_.receive(10.0, false, true);
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
break;
}
@ -634,7 +683,7 @@ class Client::Impl final {
private:
std::shared_ptr<MultiImpl> multi_impl_;
unique_ptr<TdReceiver> receiver_;
TdReceiver receiver_;
int32 td_id_;
};

View File

@ -12233,6 +12233,11 @@ bool ContactsManager::is_user_deleted(UserId user_id) const {
return u == nullptr || u->is_deleted;
}
bool ContactsManager::is_user_support(UserId user_id) const {
auto u = get_user(user_id);
return u != nullptr && !u->is_deleted && u->is_support;
}
bool ContactsManager::is_user_bot(UserId user_id) const {
auto u = get_user(user_id);
return u != nullptr && !u->is_deleted && u->is_bot;
@ -12266,6 +12271,11 @@ Result<BotData> ContactsManager::get_bot_data(UserId user_id) const {
return bot_data;
}
bool ContactsManager::is_user_online(UserId user_id) const {
int32 was_online = get_user_was_online(get_user(user_id), user_id);
return was_online > G()->unix_time();
}
bool ContactsManager::is_user_status_exact(UserId user_id) const {
auto u = get_user(user_id);
return u != nullptr && !u->is_deleted && !u->is_bot && u->was_online > 0;

View File

@ -427,9 +427,13 @@ class ContactsManager : public Actor {
bool is_user_deleted(UserId user_id) const;
bool is_user_support(UserId user_id) const;
bool is_user_bot(UserId user_id) const;
Result<BotData> get_bot_data(UserId user_id) const TD_WARN_UNUSED_RESULT;
bool is_user_online(UserId user_id) const;
bool is_user_status_exact(UserId user_id) const;
bool can_report_user(UserId user_id) const;

View File

@ -313,7 +313,6 @@ void CountryInfoManager::on_get_country_list(const string &language_code,
void CountryInfoManager::on_get_country_list_impl(const string &language_code,
tl_object_ptr<telegram_api::help_CountriesList> country_list) {
LOG(ERROR) << to_string(country_list);
CHECK(country_list != nullptr);
auto &countries = countries_[language_code];
switch (country_list->get_id()) {

View File

@ -22188,7 +22188,7 @@ MessageId MessagesManager::get_reply_to_message_id(Dialog *d, MessageId top_thre
return message_id;
}
if (top_thread_message_id.is_valid() && top_thread_message_id.is_server() &&
get_message_force(d, top_thread_message_id, "get_reply_to_message_id 1") != nullptr) {
get_message_force(d, top_thread_message_id, "get_reply_to_message_id 3") != nullptr) {
return top_thread_message_id;
}
@ -28882,10 +28882,20 @@ bool MessagesManager::is_dialog_action_unneeded(DialogId dialog_id) const {
UserId user_id = dialog_type == DialogType::User
? dialog_id.get_user_id()
: td_->contacts_manager_->get_secret_chat_user_id(dialog_id.get_secret_chat_id());
if (!user_id.is_valid() || td_->contacts_manager_->is_user_bot(user_id) ||
td_->contacts_manager_->is_user_deleted(user_id)) {
if (td_->contacts_manager_->is_user_deleted(user_id)) {
return true;
}
if (td_->contacts_manager_->is_user_bot(user_id) && !td_->contacts_manager_->is_user_support(user_id)) {
return true;
}
if (user_id == td_->contacts_manager_->get_my_id()) {
return true;
}
if (!td_->auth_manager_->is_bot() && !td_->contacts_manager_->is_user_online(user_id)) {
return true;
}
if (!td_->auth_manager_->is_bot() && !td_->contacts_manager_->is_user_status_exact(user_id)) {
// return true;
}

View File

@ -3412,7 +3412,6 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
request_set_.insert(id);
if (function == nullptr) {
LOG(ERROR) << "Receive empty request";
return send_error_impl(id, make_error(400, "Request is empty"));
}
@ -3511,7 +3510,6 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
td_api::object_ptr<td_api::Object> Td::static_request(td_api::object_ptr<td_api::Function> function) {
if (function == nullptr) {
LOG(ERROR) << "Receive empty static request";
return td_api::make_object<td_api::error>(400, "Request is empty");
}

View File

@ -266,13 +266,13 @@ class Td final : public NetQueryCallback {
void dec_stop_cnt();
unique_ptr<TdCallback> callback_;
Options td_options_;
MtprotoHeader::Options options_;
TdParameters parameters_;
unique_ptr<TdCallback> callback_;
Options td_options_;
StateManager::State connection_state_;
std::unordered_multiset<uint64> request_set_;

View File

@ -87,6 +87,7 @@ set(TDUTILS_SOURCE
td/utils/BufferedUdp.cpp
td/utils/check.cpp
td/utils/crypto.cpp
td/utils/ExitGuard.cpp
td/utils/FileLog.cpp
td/utils/filesystem.cpp
td/utils/find_boundary.cpp
@ -187,6 +188,7 @@ set(TDUTILS_SOURCE
td/utils/Destructor.h
td/utils/Enumerator.h
td/utils/EpochBasedMemoryReclamation.h
td/utils/ExitGuard.h
td/utils/FileLog.h
td/utils/filesystem.h
td/utils/find_boundary.h

View File

@ -0,0 +1,13 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/utils/ExitGuard.h"
namespace td {
std::atomic<bool> ExitGuard::is_exited_{false};
} // namespace td

View File

@ -0,0 +1,32 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include <atomic>
namespace td {
class ExitGuard {
public:
ExitGuard() = default;
ExitGuard(ExitGuard &&) = delete;
ExitGuard &operator=(ExitGuard &&) = delete;
ExitGuard(const ExitGuard &) = delete;
ExitGuard &operator=(const ExitGuard &) = delete;
~ExitGuard() {
is_exited_.store(true, std::memory_order_relaxed);
}
static bool is_exited() {
return is_exited_.load(std::memory_order_relaxed);
}
private:
static std::atomic<bool> is_exited_;
};
} // namespace td

View File

@ -10,6 +10,7 @@
#include "td/utils/bits.h"
#include "td/utils/CancellationToken.h"
#include "td/utils/common.h"
#include "td/utils/ExitGuard.h"
#include "td/utils/Hash.h"
#include "td/utils/HashMap.h"
#include "td/utils/HashSet.h"
@ -50,6 +51,24 @@
using namespace td;
struct CheckExitGuard {
explicit CheckExitGuard(bool expected_value): expected_value_(expected_value) {
}
CheckExitGuard(CheckExitGuard &&) = delete;
CheckExitGuard &operator=(CheckExitGuard &&) = delete;
CheckExitGuard(const CheckExitGuard &) = delete;
CheckExitGuard &operator=(const CheckExitGuard &) = delete;
~CheckExitGuard() {
ASSERT_EQ(expected_value_, ExitGuard::is_exited());
}
bool expected_value_;
};
CheckExitGuard check_exit_guard_true{true};
ExitGuard exit_guard;
CheckExitGuard check_exit_guard_false{false};
#if TD_LINUX || TD_DARWIN
TEST(Misc, update_atime_saves_mtime) {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));

View File

@ -1092,7 +1092,6 @@ TEST(Client, ManagerClose) {
#endif
TEST(Client, ManagerCloseOneThread) {
SET_VERBOSITY_LEVEL(2);
td::ClientManager client_manager;
td::uint64 request_id = 2;
@ -1133,42 +1132,44 @@ TEST(Client, ManagerCloseOneThread) {
}
};
for (td::int32 i = -5; i <= 0; i++) {
send_request(i, 400);
for (int t = 0; t < 3; t++) {
for (td::int32 i = -5; i <= 0; i++) {
send_request(i, 400);
}
receive();
auto client_id = client_manager.create_client();
for (td::int32 i = -5; i < 5; i++) {
send_request(i, i == client_id ? 0 : (i > 0 && i < client_id ? 500 : 400));
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 0);
}
receive();
sent_count++;
sent_requests.emplace(1, 0);
client_manager.send(client_id, 1, td::make_tl_object<td::td_api::close>());
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
}
receive();
auto client_id = client_manager.create_client();
for (td::int32 i = -5; i < 5; i++) {
send_request(i, i == client_id ? 0 : (i > 0 && i < client_id ? 500 : 400));
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 0);
}
receive();
sent_count++;
sent_requests.emplace(1, 0);
client_manager.send(client_id, 1, td::make_tl_object<td::td_api::close>());
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
for (int i = 0; i < 10; i++) {
send_request(client_id, 500);
}
receive();
ASSERT_TRUE(sent_requests.empty());
}