Remove InputQueue from Client.

GitOrigin-RevId: f3e4a39b13c99926e9f75b4f9b678a30c6150ac8
This commit is contained in:
levlam 2018-09-13 22:42:03 +03:00
parent 0a2191e6fd
commit b4a287068f

View File

@ -108,26 +108,28 @@ class Client::Impl final {
#else #else
/*** TdProxy ***/
using InputQueue = MpscPollableQueue<Client::Request>;
using OutputQueue = MpscPollableQueue<Client::Response>; using OutputQueue = MpscPollableQueue<Client::Response>;
class TdProxy : public Actor { class TdProxy : public Actor {
public: public:
TdProxy(std::shared_ptr<InputQueue> input_queue, std::shared_ptr<OutputQueue> output_queue) explicit TdProxy(std::shared_ptr<OutputQueue> output_queue) : output_queue_(std::move(output_queue)) {
: input_queue_(std::move(input_queue)), output_queue_(std::move(output_queue)) { }
void request(uint64 id, tl_object_ptr<td_api::Function> function) {
if (id == 0 && function == nullptr) {
was_hangup_ = true;
td_.reset();
return try_stop();
}
send_closure(td_, &Td::request, id, std::move(function));
} }
private: private:
std::shared_ptr<InputQueue> input_queue_;
std::shared_ptr<OutputQueue> output_queue_; std::shared_ptr<OutputQueue> output_queue_;
bool is_td_closed_ = false; bool is_td_closed_ = false;
bool was_hangup_ = false; bool was_hangup_ = false;
ActorOwn<Td> td_; ActorOwn<Td> td_;
void start_up() override { void start_up() override {
auto &fd = input_queue_->reader_get_event_fd();
Scheduler::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
class Callback : public TdCallback { class Callback : public TdCallback {
public: public:
Callback(ActorId<TdProxy> parent, std::shared_ptr<OutputQueue> output_queue) Callback(ActorId<TdProxy> parent, std::shared_ptr<OutputQueue> output_queue)
@ -163,33 +165,6 @@ class TdProxy : public Actor {
Scheduler::instance()->finish(); Scheduler::instance()->finish();
stop(); stop();
} }
void loop() override {
while (true) {
int size = input_queue_->reader_wait_nonblock();
if (size == 0) {
return;
}
for (int i = 0; i < size; i++) {
auto request = input_queue_->reader_get_unsafe();
if (request.id == 0 && request.function == nullptr) {
was_hangup_ = true;
td_.reset();
return try_stop();
}
send_closure_later(td_, &Td::request, request.id, std::move(request.function));
}
}
}
void hangup() override {
UNREACHABLE();
}
void tear_down() override {
auto &fd = input_queue_->reader_get_event_fd();
Scheduler::unsubscribe(fd.get_poll_info().get_pollable_fd_ref());
}
}; };
/*** Client::Impl ***/ /*** Client::Impl ***/
@ -205,7 +180,8 @@ class Client::Impl final {
return; return;
} }
input_queue_->writer_put(std::move(request)); auto guard = scheduler_->get_send_guard();
send_closure(td_proxy_, &TdProxy::request, request.id, std::move(request.function));
} }
Response receive(double timeout) { Response receive(double timeout) {
@ -222,26 +198,25 @@ class Client::Impl final {
Impl(Impl &&) = delete; Impl(Impl &&) = delete;
Impl &operator=(Impl &&) = delete; Impl &operator=(Impl &&) = delete;
~Impl() { ~Impl() {
input_queue_->writer_put({0, nullptr}); auto guard = scheduler_->get_send_guard();
send_closure(td_proxy_, &TdProxy::request, 0, nullptr);
scheduler_thread_.join(); scheduler_thread_.join();
} }
private: private:
std::shared_ptr<InputQueue> input_queue_;
std::shared_ptr<OutputQueue> output_queue_; std::shared_ptr<OutputQueue> output_queue_;
std::shared_ptr<ConcurrentScheduler> scheduler_; std::shared_ptr<ConcurrentScheduler> scheduler_;
int output_queue_ready_cnt_{0}; int output_queue_ready_cnt_{0};
thread scheduler_thread_; thread scheduler_thread_;
std::atomic<bool> receive_lock_{false}; std::atomic<bool> receive_lock_{false};
ActorId<TdProxy> td_proxy_;
void init() { void init() {
input_queue_ = std::make_shared<InputQueue>();
input_queue_->init();
output_queue_ = std::make_shared<OutputQueue>(); output_queue_ = std::make_shared<OutputQueue>();
output_queue_->init(); output_queue_->init();
scheduler_ = std::make_shared<ConcurrentScheduler>(); scheduler_ = std::make_shared<ConcurrentScheduler>();
scheduler_->init(3); scheduler_->init(3);
scheduler_->create_actor_unsafe<TdProxy>(0, "TdProxy", input_queue_, output_queue_).release(); td_proxy_ = scheduler_->create_actor_unsafe<TdProxy>(0, "TdProxy", output_queue_).release();
scheduler_->start(); scheduler_->start();
scheduler_thread_ = thread([scheduler = scheduler_] { scheduler_thread_ = thread([scheduler = scheduler_] {