Minor new Client fixes.

GitOrigin-RevId: f7c455192f5d35a8cbf6271522202bfbff867698
This commit is contained in:
levlam 2020-07-30 04:04:57 +03:00
parent b1222a9bb7
commit e4ee1b7ce1
3 changed files with 64 additions and 64 deletions

View File

@ -7,6 +7,7 @@
#include "td/telegram/Client.h" #include "td/telegram/Client.h"
#include "td/telegram/Td.h" #include "td/telegram/Td.h"
#include "td/telegram/TdCallback.h"
#include "td/actor/actor.h" #include "td/actor/actor.h"
@ -20,20 +21,13 @@
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <deque>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <queue>
#include <unordered_map> #include <unordered_map>
namespace td { namespace td {
class TdReceiver {
public:
virtual ~TdReceiver() = default;
virtual MultiClient::Response receive(double timeout) = 0;
virtual unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) = 0;
};
class MultiTd : public Actor { class MultiTd : public Actor {
public: public:
void create(int32 td_id, unique_ptr<TdCallback> callback) { void create(int32 td_id, unique_ptr<TdCallback> callback) {
@ -43,12 +37,12 @@ class MultiTd : public Actor {
string name = "Td"; string name = "Td";
class TdActorContext : public ActorContext { class TdActorContext : public ActorContext {
public: public:
explicit TdActorContext(std::string tag) : tag_(std::move(tag)) { explicit TdActorContext(string tag) : tag_(std::move(tag)) {
} }
int32 get_id() const override { int32 get_id() const override {
return 0x172ae58d; return 0x172ae58d;
} }
std::string tag_; string tag_;
}; };
auto context = std::make_shared<TdActorContext>(to_string(td_id)); auto context = std::make_shared<TdActorContext>(to_string(td_id));
auto old_context = set_context(context); auto old_context = set_context(context);
@ -57,26 +51,26 @@ class MultiTd : public Actor {
set_context(old_context); set_context(old_context);
set_tag(old_tag); set_tag(old_tag);
} }
void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) { void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) {
auto &td = tds_[client_id]; auto &td = tds_[client_id];
CHECK(!td.empty()); CHECK(!td.empty());
send_closure(td, &Td::request, request_id, std::move(function)); send_closure(td, &Td::request, request_id, std::move(function));
} }
void destroy(int32 td_id) { void destroy(int32 td_id) {
auto size = tds_.erase(td_id); auto size = tds_.erase(td_id);
CHECK(size == 1); CHECK(size == 1);
} }
private: private:
std::unordered_map<int32, ActorOwn<Td> > tds_; std::unordered_map<int32, ActorOwn<Td>> tds_;
}; };
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
class TdReceiverSimple : public TdReceiver { class TdReceiver {
public: public:
TdReceiverSimple() { MultiClient::Response receive(double timeout) {
}
MultiClient::Response receive(double timeout) final {
if (!responses_.empty()) { if (!responses_.empty()) {
auto result = std::move(responses_.front()); auto result = std::move(responses_.front());
responses_.pop_front(); responses_.pop_front();
@ -84,15 +78,16 @@ class TdReceiverSimple : public TdReceiver {
} }
return {0, 0, nullptr}; return {0, 0, nullptr};
} }
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) final {
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) {
class Callback : public TdCallback { class Callback : public TdCallback {
public: public:
explicit Callback(MultiClient::ClientId client_id, TdReceiverSimple *impl) : client_id_(client_id), impl_(impl) { Callback(MultiClient::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) {
} }
void on_result(std::uint64_t id, td_api::object_ptr<td_api::Object> result) override { void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
impl_->responses_.push_back({client_id_, id, std::move(result)}); impl_->responses_.push_back({client_id_, id, std::move(result)});
} }
void on_error(std::uint64_t id, td_api::object_ptr<td_api::error> error) override { void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
impl_->responses_.push_back({client_id_, id, std::move(error)}); impl_->responses_.push_back({client_id_, id, std::move(error)});
} }
Callback(const Callback &) = delete; Callback(const Callback &) = delete;
@ -105,13 +100,13 @@ class TdReceiverSimple : public TdReceiver {
private: private:
MultiClient::ClientId client_id_; MultiClient::ClientId client_id_;
TdReceiverSimple *impl_; TdReceiver *impl_;
}; };
return td::make_unique<Callback>(client_id, this); return td::make_unique<Callback>(client_id, this);
} }
private: private:
std::deque<MultiClient::Response> responses_; std::queue<MultiClient::Response> responses_;
}; };
class MultiClient::Impl final { class MultiClient::Impl final {
@ -119,7 +114,7 @@ class MultiClient::Impl final {
Impl() { Impl() {
concurrent_scheduler_ = make_unique<ConcurrentScheduler>(); concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0); concurrent_scheduler_->init(0);
receiver_ = make_unique<TdReceiverSimple>(); receiver_ = make_unique<TdReceiver>();
concurrent_scheduler_->start(); concurrent_scheduler_->start();
} }
@ -162,10 +157,11 @@ class MultiClient::Impl final {
return response; return response;
} }
static Object execute(Function &&function) { Impl() = default;
return Td::static_request(std::move(function)); Impl(const Impl &) = delete;
} Impl &operator=(const Impl &) = delete;
Impl(Impl &&) = delete;
Impl &operator=(Impl &&) = delete;
~Impl() { ~Impl() {
{ {
auto guard = concurrent_scheduler_->get_main_guard(); auto guard = concurrent_scheduler_->get_main_guard();
@ -180,8 +176,7 @@ class MultiClient::Impl final {
} }
private: private:
friend class Client::Impl; unique_ptr<TdReceiver> receiver_;
td::unique_ptr<TdReceiver> receiver_;
struct Request { struct Request {
ClientId client_id; ClientId client_id;
RequestId id; RequestId id;
@ -190,7 +185,7 @@ class MultiClient::Impl final {
std::vector<Request> requests_; std::vector<Request> requests_;
unique_ptr<ConcurrentScheduler> concurrent_scheduler_; unique_ptr<ConcurrentScheduler> concurrent_scheduler_;
ClientId client_id_{0}; ClientId client_id_{0};
std::unordered_map<int32, ActorOwn<Td> > tds_; std::unordered_map<int32, ActorOwn<Td>> tds_;
}; };
class Client::Impl final { class Client::Impl final {
@ -211,11 +206,6 @@ class Client::Impl final {
return old_response; return old_response;
} }
Impl(const Impl &) = delete;
Impl &operator=(const Impl &) = delete;
Impl(Impl &&) = delete;
Impl &operator=(Impl &&) = delete;
private: private:
MultiClient::Impl impl_; MultiClient::Impl impl_;
MultiClient::ClientId client_id_; MultiClient::ClientId client_id_;
@ -223,13 +213,14 @@ class Client::Impl final {
#else #else
class TdReceiverTs : public TdReceiver { class TdReceiver {
public: public:
TdReceiverTs() { TdReceiver() {
output_queue_ = std::make_shared<OutputQueue>(); output_queue_ = std::make_shared<OutputQueue>();
output_queue_->init(); output_queue_->init();
} }
MultiClient::Response receive(double timeout) final {
MultiClient::Response receive(double timeout) {
if (output_queue_ready_cnt_ == 0) { if (output_queue_ready_cnt_ == 0) {
output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock(); output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock();
} }
@ -243,16 +234,17 @@ class TdReceiverTs : public TdReceiver {
} }
return {0, 0, nullptr}; return {0, 0, nullptr};
} }
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) final {
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) {
class Callback : public TdCallback { class Callback : public TdCallback {
public: public:
explicit Callback(MultiClient::ClientId client_id, std::shared_ptr<OutputQueue> output_queue) explicit Callback(MultiClient::ClientId client_id, std::shared_ptr<OutputQueue> output_queue)
: client_id_(client_id), output_queue_(std::move(output_queue)) { : client_id_(client_id), output_queue_(std::move(output_queue)) {
} }
void on_result(std::uint64_t id, td_api::object_ptr<td_api::Object> result) override { void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
output_queue_->writer_put({client_id_, id, std::move(result)}); output_queue_->writer_put({client_id_, id, std::move(result)});
} }
void on_error(std::uint64_t id, td_api::object_ptr<td_api::error> error) override { void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
output_queue_->writer_put({client_id_, id, std::move(error)}); output_queue_->writer_put({client_id_, id, std::move(error)});
} }
Callback(const Callback &) = delete; Callback(const Callback &) = delete;
@ -304,10 +296,6 @@ class MultiImpl {
return id; return id;
} }
[[deprecated]] void send(int32 td_id, Client::Request request) {
auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::send, td_id, request.id, std::move(request.function));
}
void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) { void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) {
auto guard = concurrent_scheduler_->get_send_guard(); auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::send, client_id, request_id, std::move(function)); send_closure(multi_td_, &MultiTd::send, client_id, request_id, std::move(function));
@ -333,9 +321,9 @@ class MultiImpl {
thread scheduler_thread_; thread scheduler_thread_;
ActorOwn<MultiTd> multi_td_; ActorOwn<MultiTd> multi_td_;
int32 create_id() { static int32 create_id() {
static std::atomic<int32> id_{0}; static std::atomic<int32> current_id{1};
return id_.fetch_add(1) + 1; return current_id.fetch_add(1);
} }
void create(int32 td_id, unique_ptr<TdCallback> callback) { void create(int32 td_id, unique_ptr<TdCallback> callback) {
@ -363,7 +351,7 @@ class MultiImplPool {
private: private:
std::mutex mutex_; std::mutex mutex_;
std::vector<std::weak_ptr<MultiImpl> > impls_; std::vector<std::weak_ptr<MultiImpl>> impls_;
}; };
class MultiClient::Impl final { class MultiClient::Impl final {
@ -377,12 +365,14 @@ class MultiClient::Impl final {
} }
return client_id; return client_id;
} }
void send(ClientId client_id, RequestId request_id, Function function) { void send(ClientId client_id, RequestId request_id, Function function) {
auto lock = impls_mutex_.lock_read().move_as_ok(); auto lock = impls_mutex_.lock_read().move_as_ok();
auto it = impls_.find(client_id); auto it = impls_.find(client_id);
CHECK(it != impls_.end()); CHECK(it != impls_.end());
it->second->send(client_id, request_id, std::move(function)); it->second->send(client_id, request_id, std::move(function));
} }
Response receive(double timeout) { Response receive(double timeout) {
auto res = receiver_->receive(timeout); auto res = receiver_->receive(timeout);
if (res.client_id != 0 && !res.object) { if (res.client_id != 0 && !res.object) {
@ -391,10 +381,12 @@ class MultiClient::Impl final {
} }
return res; return res;
} }
static Object execute(Function &&function) {
return Td::static_request(std::move(function));
}
Impl() = default;
Impl(const Impl &) = delete;
Impl &operator=(const Impl &) = delete;
Impl(Impl &&) = delete;
Impl &operator=(Impl &&) = delete;
~Impl() { ~Impl() {
for (auto &it : impls_) { for (auto &it : impls_) {
it.second->destroy(it.first); it.second->destroy(it.first);
@ -406,9 +398,9 @@ class MultiClient::Impl final {
private: private:
MultiImplPool pool_; MultiImplPool pool_;
td::RwMutex impls_mutex_; RwMutex impls_mutex_;
std::unordered_map<ClientId, std::shared_ptr<MultiImpl> > impls_; std::unordered_map<ClientId, std::shared_ptr<MultiImpl>> impls_;
td::unique_ptr<TdReceiver> receiver_{td::make_unique<TdReceiverTs>()}; unique_ptr<TdReceiver> receiver_{make_unique<TdReceiver>()};
}; };
class Client::Impl final { class Client::Impl final {
@ -416,7 +408,7 @@ class Client::Impl final {
Impl() { Impl() {
static MultiImplPool pool; static MultiImplPool pool;
multi_impl_ = pool.get(); multi_impl_ = pool.get();
receiver_ = make_unique<TdReceiverTs>(); receiver_ = make_unique<TdReceiver>();
td_id_ = multi_impl_->create(*receiver_); td_id_ = multi_impl_->create(*receiver_);
} }
@ -455,7 +447,7 @@ class Client::Impl final {
private: private:
std::shared_ptr<MultiImpl> multi_impl_; std::shared_ptr<MultiImpl> multi_impl_;
td::unique_ptr<TdReceiver> receiver_; unique_ptr<TdReceiver> receiver_;
bool is_closed_{false}; bool is_closed_{false};
int32 td_id_; int32 td_id_;
@ -492,14 +484,17 @@ MultiClient::MultiClient() : impl_(std::make_unique<Impl>()) {
MultiClient::ClientId MultiClient::create_client() { MultiClient::ClientId MultiClient::create_client() {
return impl_->create_client(); return impl_->create_client();
} }
void MultiClient::send(ClientId client_id, RequestId request_id, Function function) {
void MultiClient::send(ClientId client_id, RequestId request_id, Function &&function) {
impl_->send(client_id, request_id, std::move(function)); impl_->send(client_id, request_id, std::move(function));
} }
MultiClient::Response MultiClient::receive(double timeout) { MultiClient::Response MultiClient::receive(double timeout) {
return impl_->receive(timeout); return impl_->receive(timeout);
} }
MultiClient::Object MultiClient::execute(Function &&function) { MultiClient::Object MultiClient::execute(Function &&function) {
return Impl::execute(std::move(function)); return Td::static_request(std::move(function));
} }
MultiClient::~MultiClient() = default; MultiClient::~MultiClient() = default;

View File

@ -127,7 +127,6 @@ class Client final {
Client &operator=(Client &&other); Client &operator=(Client &&other);
private: private:
friend class MultiClient;
class Impl; class Impl;
std::unique_ptr<Impl> impl_; std::unique_ptr<Impl> impl_;
}; };
@ -148,16 +147,20 @@ class MultiClient final {
}; };
ClientId create_client(); ClientId create_client();
void send(ClientId client_id, RequestId request_id, Function function);
void send(ClientId client_id, RequestId request_id, Function &&function);
Response receive(double timeout); Response receive(double timeout);
static Object execute(Function &&function); static Object execute(Function &&function);
~MultiClient(); ~MultiClient();
MultiClient(MultiClient &&other); MultiClient(MultiClient &&other);
MultiClient &operator=(MultiClient &&other); MultiClient &operator=(MultiClient &&other);
private: private:
friend class Client;
class Impl; class Impl;
std::unique_ptr<Impl> impl_; std::unique_ptr<Impl> impl_;
}; };

View File

@ -34,6 +34,7 @@
#include <functional> #include <functional>
#include <map> #include <map>
#include <memory> #include <memory>
#include <set>
#include <utility> #include <utility>
REGISTER_TESTS(tdclient); REGISTER_TESTS(tdclient);
@ -875,7 +876,7 @@ TEST(Client, SimpleMulti) {
#if !TD_THREAD_UNSUPPORTED #if !TD_THREAD_UNSUPPORTED
TEST(Client, Multi) { TEST(Client, Multi) {
std::vector<td::thread> threads; td::vector<td::thread> threads;
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
threads.emplace_back([] { threads.emplace_back([] {
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -895,8 +896,9 @@ TEST(Client, Multi) {
thread.join(); thread.join();
} }
} }
TEST(Client, MultiNew) { TEST(Client, MultiNew) {
std::vector<td::thread> threads; td::vector<td::thread> threads;
td::MultiClient client; td::MultiClient client;
int threads_n = 4; int threads_n = 4;
int clients_n = 1000; int clients_n = 1000;
@ -912,7 +914,7 @@ TEST(Client, MultiNew) {
thread.join(); thread.join();
} }
std::set<std::int32_t> ids; std::set<int32> ids;
while (ids.size() * threads_n * clients_n) { while (ids.size() * threads_n * clients_n) {
auto event = client.receive(10); auto event = client.receive(10);
if (event.client_id != 0 && event.id == 3) { if (event.client_id != 0 && event.id == 3) {