Minor Client fixes.

GitOrigin-RevId: 039c9affda4f3b70eacb545c05b99cfd6523c5e1
This commit is contained in:
levlam 2019-03-22 01:42:41 +03:00
parent 18900e9d69
commit 0bd7881fb7

View File

@ -18,6 +18,9 @@
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <memory>
#include <mutex>
#include <unordered_map>
namespace td { namespace td {
@ -108,28 +111,28 @@ class Client::Impl final {
class MultiTd : public Actor { class MultiTd : public Actor {
public: public:
void create(int td_id, unique_ptr<TdCallback> callback) { void create(int32 td_id, unique_ptr<TdCallback> callback) {
auto &td = tds_[td_id]; auto &td = tds_[td_id];
CHECK(td.empty()); CHECK(td.empty());
string name = "Td"; string name = "Td";
if (td_id != 0) { if (td_id != 0) {
name += PSTRING() << "#" << td_id; name += PSTRING() << '#' << td_id;
} }
td = create_actor<Td>(name, std::move(callback)); td = create_actor<Td>(name, std::move(callback));
} }
void send(int td_id, Client::Request request) { void send(int32 td_id, Client::Request request) {
auto &td = tds_[td_id]; auto &td = tds_[td_id];
CHECK(!td.empty()); CHECK(!td.empty());
send_closure(td, &Td::request, request.id, std::move(request.function)); send_closure(td, &Td::request, request.id, std::move(request.function));
} }
void destroy(int td_id) { void destroy(int32 td_id) {
auto size = tds_.erase(td_id); auto size = tds_.erase(td_id);
CHECK(size == 1); CHECK(size == 1);
} }
private: private:
std::unordered_map<int, ActorOwn<Td> > tds_; std::unordered_map<int32, ActorOwn<Td> > tds_;
}; };
class MultiImpl { class MultiImpl {
@ -145,6 +148,7 @@ class MultiImpl {
} }
return res; return res;
} }
MultiImpl() { MultiImpl() {
concurrent_scheduler_ = std::make_shared<ConcurrentScheduler>(); concurrent_scheduler_ = std::make_shared<ConcurrentScheduler>();
concurrent_scheduler_->init(3); concurrent_scheduler_->init(3);
@ -161,17 +165,25 @@ class MultiImpl {
concurrent_scheduler->finish(); concurrent_scheduler->finish();
}); });
} }
MultiImpl(const MultiImpl &) = delete;
MultiImpl &operator=(const MultiImpl &) = delete;
MultiImpl(MultiImpl &&) = delete;
MultiImpl &operator=(MultiImpl &&) = delete;
int32 create_id() { int32 create_id() {
return id_.fetch_add(1) + 1; return id_.fetch_add(1) + 1;
} }
void create(int32 td_id, td::unique_ptr<TdCallback> callback) {
void create(int32 td_id, unique_ptr<TdCallback> callback) {
auto guard = concurrent_scheduler_->get_send_guard(); auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback)); send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback));
} }
void send(int32 td_id, Client::Request request) { void send(int32 td_id, Client::Request request) {
auto guard = concurrent_scheduler_->get_send_guard(); auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::send, td_id, std::move(request)); send_closure(multi_td_, &MultiTd::send, td_id, std::move(request));
} }
void destroy(int32 td_id) { void destroy(int32 td_id) {
auto guard = concurrent_scheduler_->get_send_guard(); auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::destroy, td_id); send_closure(multi_td_, &MultiTd::destroy, td_id);
@ -188,12 +200,11 @@ class MultiImpl {
private: private:
std::shared_ptr<ConcurrentScheduler> concurrent_scheduler_; std::shared_ptr<ConcurrentScheduler> concurrent_scheduler_;
td::thread scheduler_thread_; thread scheduler_thread_;
td::ActorOwn<MultiTd> multi_td_; ActorOwn<MultiTd> multi_td_;
std::atomic<int32> id_{0}; std::atomic<int32> id_{0};
}; };
/*** Client::Impl ***/
class Client::Impl final { class Client::Impl final {
public: public:
using OutputQueue = MpscPollableQueue<Client::Response>; using OutputQueue = MpscPollableQueue<Client::Response>;
@ -275,7 +286,7 @@ class Client::Impl final {
if (output_queue_ready_cnt_ > 0) { if (output_queue_ready_cnt_ > 0) {
output_queue_ready_cnt_--; output_queue_ready_cnt_--;
auto res = output_queue_->reader_get_unsafe(); auto res = output_queue_->reader_get_unsafe();
if (!res.object) { if (res.object == nullptr && res.id == 0) {
is_closed_ = true; is_closed_ = true;
} }
return res; return res;
@ -289,7 +300,6 @@ class Client::Impl final {
}; };
#endif #endif
/*** Client ***/
Client::Client() : impl_(std::make_unique<Impl>()) { Client::Client() : impl_(std::make_unique<Impl>()) {
// At least it should be enough for everybody who uses TDLib // At least it should be enough for everybody who uses TDLib
init_openssl_threads(); init_openssl_threads();