Don't create client through ClientManager until first request is sent to it.

This commit is contained in:
levlam 2020-11-12 14:45:18 +03:00
parent 597c0d2983
commit 6a9cd72636
7 changed files with 66 additions and 46 deletions

View File

@ -78,6 +78,14 @@ class TdReceiver {
class ClientManager::Impl final { class ClientManager::Impl final {
public: public:
ClientId create_client() { ClientId create_client() {
CHECK(client_id_ != std::numeric_limits<ClientId>::max());
auto client_id = ++client_id_;
pending_clients_.insert(client_id);
return client_id;
}
void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) {
if (pending_clients_.erase(client_id) != 0) {
if (tds_.empty()) { if (tds_.empty()) {
CHECK(concurrent_scheduler_ == nullptr); CHECK(concurrent_scheduler_ == nullptr);
CHECK(options_.net_query_stats == nullptr); CHECK(options_.net_query_stats == nullptr);
@ -86,14 +94,9 @@ class ClientManager::Impl final {
concurrent_scheduler_->init(0); concurrent_scheduler_->init(0);
concurrent_scheduler_->start(); concurrent_scheduler_->start();
} }
CHECK(client_id_ != std::numeric_limits<ClientId>::max());
auto client_id = ++client_id_;
tds_[client_id] = tds_[client_id] =
concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_.create_callback(client_id), options_); concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_.create_callback(client_id), options_);
return client_id;
} }
void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) {
requests_.push_back({client_id, request_id, std::move(request)}); requests_.push_back({client_id, request_id, std::move(request)});
} }
@ -196,6 +199,7 @@ class ClientManager::Impl final {
unique_ptr<ConcurrentScheduler> concurrent_scheduler_; unique_ptr<ConcurrentScheduler> concurrent_scheduler_;
ClientId client_id_{0}; ClientId client_id_{0};
Td::Options options_; Td::Options options_;
std::unordered_set<int32> pending_clients_;
std::unordered_map<int32, ActorOwn<Td>> tds_; std::unordered_map<int32, ActorOwn<Td>> tds_;
}; };
@ -356,10 +360,15 @@ class MultiImpl {
MultiImpl(MultiImpl &&) = delete; MultiImpl(MultiImpl &&) = delete;
MultiImpl &operator=(MultiImpl &&) = delete; MultiImpl &operator=(MultiImpl &&) = delete;
int32 create(TdReceiver &receiver) { static int32 create_id() {
auto id = create_id(); auto result = current_id_.fetch_add(1);
create(id, receiver.create_callback(id)); CHECK(result <= static_cast<uint32>(std::numeric_limits<int32>::max()));
return id; return static_cast<int32>(result);
}
void create(int32 td_id, unique_ptr<TdCallback> callback) {
auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback));
} }
static bool is_valid_client_id(int32 client_id) { static bool is_valid_client_id(int32 client_id) {
@ -395,17 +404,6 @@ class MultiImpl {
ActorOwn<MultiTd> multi_td_; ActorOwn<MultiTd> multi_td_;
static std::atomic<uint32> current_id_; static std::atomic<uint32> current_id_;
static int32 create_id() {
auto result = current_id_.fetch_add(1);
CHECK(result <= static_cast<uint32>(std::numeric_limits<int32>::max()));
return static_cast<int32>(result);
}
void create(int32 td_id, unique_ptr<TdCallback> callback) {
auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback));
}
}; };
std::atomic<uint32> MultiImpl::current_id_{1}; std::atomic<uint32> MultiImpl::current_id_{1};
@ -458,11 +456,10 @@ class MultiImplPool {
class ClientManager::Impl final { class ClientManager::Impl final {
public: public:
ClientId create_client() { ClientId create_client() {
auto impl = pool_.get(); auto client_id = MultiImpl::create_id();
auto client_id = impl->create(receiver_);
{ {
auto lock = impls_mutex_.lock_write().move_as_ok(); auto lock = impls_mutex_.lock_write().move_as_ok();
impls_[client_id].impl = std::move(impl); impls_[client_id]; // create empty MultiImplInfo
} }
return client_id; return client_id;
} }
@ -474,7 +471,22 @@ class ClientManager::Impl final {
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified")); td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
return; return;
} }
auto it = impls_.find(client_id); auto it = impls_.find(client_id);
if (it != impls_.end() && it->second.impl == nullptr) {
lock.reset();
auto write_lock = impls_mutex_.lock_write().move_as_ok();
it = impls_.find(client_id);
if (it != impls_.end() && it->second.impl == nullptr) {
it->second.impl = pool_.get();
it->second.impl->create(client_id, receiver_.create_callback(client_id));
}
write_lock.reset();
lock = impls_mutex_.lock_read().move_as_ok();
it = impls_.find(client_id);
}
if (it == impls_.end() || it->second.is_closed) { if (it == impls_.end() || it->second.is_closed) {
receiver_.add_response(client_id, request_id, td_api::make_object<td_api::error>(500, "Request aborted")); receiver_.add_response(client_id, request_id, td_api::make_object<td_api::error>(500, "Request aborted"));
return; return;
@ -517,9 +529,13 @@ class ClientManager::Impl final {
CHECK(it != impls_.end()); CHECK(it != impls_.end());
if (!it->second.is_closed) { if (!it->second.is_closed) {
it->second.is_closed = true; it->second.is_closed = true;
if (it->second.impl == nullptr) {
receiver_.add_response(client_id, 0, nullptr);
} else {
it->second.impl->close(client_id); it->second.impl->close(client_id);
} }
} }
}
Impl() = default; Impl() = default;
Impl(const Impl &) = delete; Impl(const Impl &) = delete;
@ -554,7 +570,8 @@ class Client::Impl final {
Impl() { Impl() {
static MultiImplPool pool; static MultiImplPool pool;
multi_impl_ = pool.get(); multi_impl_ = pool.get();
td_id_ = multi_impl_->create(receiver_); td_id_ = MultiImpl::create_id();
multi_impl_->create(td_id_, receiver_.create_callback(td_id_));
} }
void send(Request request) { void send(Request request) {

View File

@ -184,6 +184,7 @@ class ClientManager final {
/** /**
* Creates a new TDLib client and returns its opaque identifier. * Creates a new TDLib client and returns its opaque identifier.
* The client will not send updates until the first request is sent to it.
*/ */
ClientId create_client(); ClientId create_client();

View File

@ -125,11 +125,11 @@ static std::mutex extra_mutex;
static std::unordered_map<int64, string> extra; static std::unordered_map<int64, string> extra;
static std::atomic<uint64> extra_id{1}; static std::atomic<uint64> extra_id{1};
int td_json_create_client() { int json_create_client() {
return static_cast<int>(get_manager()->create_client()); return static_cast<int>(get_manager()->create_client());
} }
void td_json_send(int client_id, Slice request) { void json_send(int client_id, Slice request) {
auto parsed_request = to_request(request); auto parsed_request = to_request(request);
auto request_id = extra_id.fetch_add(1, std::memory_order_relaxed); auto request_id = extra_id.fetch_add(1, std::memory_order_relaxed);
if (!parsed_request.second.empty()) { if (!parsed_request.second.empty()) {
@ -139,7 +139,7 @@ void td_json_send(int client_id, Slice request) {
get_manager()->send(client_id, request_id, std::move(parsed_request.first)); get_manager()->send(client_id, request_id, std::move(parsed_request.first));
} }
const char *td_json_receive(double timeout) { const char *json_receive(double timeout) {
auto response = get_manager()->receive(timeout); auto response = get_manager()->receive(timeout);
if (!response.object) { if (!response.object) {
return nullptr; return nullptr;
@ -157,7 +157,7 @@ const char *td_json_receive(double timeout) {
return store_string(from_response(*response.object, extra_str, response.client_id)); return store_string(from_response(*response.object, extra_str, response.client_id));
} }
const char *td_json_execute(Slice request) { const char *json_execute(Slice request) {
auto parsed_request = to_request(request); auto parsed_request = to_request(request);
return store_string( return store_string(
from_response(*ClientManager::execute(std::move(parsed_request.first)), parsed_request.second, 0)); from_response(*ClientManager::execute(std::move(parsed_request.first)), parsed_request.second, 0));

View File

@ -33,12 +33,12 @@ class ClientJson final {
std::atomic<std::uint64_t> extra_id_{1}; std::atomic<std::uint64_t> extra_id_{1};
}; };
int td_json_create_client(); int json_create_client();
void td_json_send(int client_id, Slice request); void json_send(int client_id, Slice request);
const char *td_json_receive(double timeout); const char *json_receive(double timeout);
const char *td_json_execute(Slice request); const char *json_execute(Slice request);
} // namespace td } // namespace td

View File

@ -31,17 +31,17 @@ const char *td_json_client_execute(void *client, const char *request) {
} }
int td_create_client() { int td_create_client() {
return td::td_json_create_client(); return td::json_create_client();
} }
void td_send(int client_id, const char *request) { void td_send(int client_id, const char *request) {
td::td_json_send(client_id, td::Slice(request == nullptr ? "" : request)); td::json_send(client_id, td::Slice(request == nullptr ? "" : request));
} }
const char *td_receive(double timeout) { const char *td_receive(double timeout) {
return td::td_json_receive(timeout); return td::json_receive(timeout);
} }
const char *td_execute(const char *request) { const char *td_execute(const char *request) {
return td::td_json_execute(td::Slice(request == nullptr ? "" : request)); return td::json_execute(td::Slice(request == nullptr ? "" : request));
} }

View File

@ -126,8 +126,8 @@ TDJSON_EXPORT void td_json_client_destroy(void *client);
*/ */
/** /**
* Creates a new instance of TDLib. * Creates a new instance of TDLib. The TDLib instance will not send updates until the first request is sent to it.
* \return Opaque indentifier of the created TDLib client. * \return Opaque indentifier of the created TDLib instance.
*/ */
TDJSON_EXPORT int td_create_client(); TDJSON_EXPORT int td_create_client();

View File

@ -941,10 +941,12 @@ TEST(Client, Manager) {
client.send(-1, 3, td::make_tl_object<td::td_api::testSquareInt>(3)); client.send(-1, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
for (int i = 0; i < threads_n; i++) { for (int i = 0; i < threads_n; i++) {
threads.emplace_back([&] { threads.emplace_back([&] {
for (int i = 0; i < clients_n; i++) { for (int i = 0; i <= clients_n; i++) {
auto id = client.create_client(); auto id = client.create_client();
if (i != 0) {
client.send(id, 3, td::make_tl_object<td::td_api::testSquareInt>(3)); client.send(id, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
} }
}
}); });
} }
for (auto &thread : threads) { for (auto &thread : threads) {