Added optional separation between updates queue and responses queue
This commit is contained in:
parent
510aec55de
commit
c04e7bcd47
@ -65,11 +65,20 @@ class MultiTd : public Actor {
|
||||
class TdReceiver {
|
||||
public:
|
||||
MultiClient::Response receive(double timeout) {
|
||||
if (!responses_.empty()) {
|
||||
return receive(timeout, true, true);
|
||||
}
|
||||
|
||||
MultiClient::Response receive(double timeout, bool include_responses, bool include_updates) {
|
||||
if (include_responses && !responses_.empty()) {
|
||||
auto result = std::move(responses_.front());
|
||||
responses_.pop();
|
||||
return result;
|
||||
}
|
||||
if (include_updates && !updates_.empty()) {
|
||||
auto result = std::move(updates_.front());
|
||||
updates_.pop();
|
||||
return result;
|
||||
}
|
||||
return {0, 0, nullptr};
|
||||
}
|
||||
|
||||
@ -89,7 +98,7 @@ class TdReceiver {
|
||||
Callback(Callback &&) = delete;
|
||||
Callback &operator=(Callback &&) = delete;
|
||||
~Callback() override {
|
||||
impl_->responses_.push({client_id_, 0, nullptr});
|
||||
impl_->updates_.push({client_id_, 0, nullptr});
|
||||
}
|
||||
|
||||
private:
|
||||
@ -100,6 +109,7 @@ class TdReceiver {
|
||||
}
|
||||
|
||||
private:
|
||||
std::queue<MultiClient::Response> updates_;
|
||||
std::queue<MultiClient::Response> responses_;
|
||||
};
|
||||
|
||||
@ -129,6 +139,10 @@ class MultiClient::Impl final {
|
||||
}
|
||||
|
||||
Response receive(double timeout) {
|
||||
return receive(timeout, true, true);
|
||||
}
|
||||
|
||||
Response receive(double timeout, bool include_responses, bool include_updates) {
|
||||
if (!requests_.empty()) {
|
||||
auto guard = concurrent_scheduler_->get_main_guard();
|
||||
for (auto &request : requests_) {
|
||||
@ -139,10 +153,10 @@ class MultiClient::Impl final {
|
||||
requests_.clear();
|
||||
}
|
||||
|
||||
auto response = receiver_->receive(0);
|
||||
auto response = receiver_->receive(0, include_responses, include_updates);
|
||||
if (response.client_id == 0) {
|
||||
concurrent_scheduler_->run_main(0);
|
||||
response = receiver_->receive(0);
|
||||
response = receiver_->receive(0, include_responses, include_updates);
|
||||
} else {
|
||||
ConcurrentScheduler::emscripten_clear_main_timeout();
|
||||
}
|
||||
@ -195,7 +209,11 @@ class Client::Impl final {
|
||||
}
|
||||
|
||||
Response receive(double timeout) {
|
||||
auto response = impl_.receive(timeout);
|
||||
return receive(timeout, true, true);
|
||||
}
|
||||
|
||||
Response receive(double timeout, bool include_responses, bool include_updates) {
|
||||
auto response = impl_.receive(timeout, include_responses, include_updates);
|
||||
Response old_response;
|
||||
old_response.id = response.id;
|
||||
old_response.object = std::move(response.object);
|
||||
@ -212,15 +230,21 @@ class Client::Impl final {
|
||||
class TdReceiver {
|
||||
public:
|
||||
TdReceiver() {
|
||||
output_queue_ = std::make_shared<OutputQueue>();
|
||||
output_queue_->init();
|
||||
output_updates_queue_ = std::make_shared<OutputQueue>();
|
||||
output_updates_queue_->init();
|
||||
output_responses_queue_ = std::make_shared<OutputQueue>();
|
||||
output_responses_queue_->init();
|
||||
}
|
||||
|
||||
MultiClient::Response receive(double timeout) {
|
||||
return receive(timeout, true, true);
|
||||
}
|
||||
|
||||
MultiClient::Response receive(double timeout, bool include_responses, bool include_updates) {
|
||||
VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout;
|
||||
auto is_locked = receive_lock_.exchange(true);
|
||||
CHECK(!is_locked);
|
||||
auto response = receive_unlocked(timeout);
|
||||
auto response = receive_unlocked(timeout, include_responses, include_updates);
|
||||
is_locked = receive_lock_.exchange(false);
|
||||
CHECK(is_locked);
|
||||
VLOG(td_requests) << "End to wait for updates, returning object " << response.id << ' ' << response.object.get();
|
||||
@ -230,47 +254,61 @@ class TdReceiver {
|
||||
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) {
|
||||
class Callback : public TdCallback {
|
||||
public:
|
||||
explicit Callback(MultiClient::ClientId client_id, std::shared_ptr<OutputQueue> output_queue)
|
||||
: client_id_(client_id), output_queue_(std::move(output_queue)) {
|
||||
explicit Callback(MultiClient::ClientId client_id, std::shared_ptr<OutputQueue> output_updates_queue, std::shared_ptr<OutputQueue> output_responses_queue)
|
||||
: client_id_(client_id), output_updates_queue_(std::move(output_updates_queue)), output_responses_queue_(std::move(output_responses_queue)) {
|
||||
}
|
||||
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
|
||||
output_queue_->writer_put({client_id_, id, std::move(result)});
|
||||
output_responses_queue_->writer_put({client_id_, id, std::move(result)});
|
||||
}
|
||||
void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
|
||||
output_queue_->writer_put({client_id_, id, std::move(error)});
|
||||
output_responses_queue_->writer_put({client_id_, id, std::move(error)});
|
||||
}
|
||||
Callback(const Callback &) = delete;
|
||||
Callback &operator=(const Callback &) = delete;
|
||||
Callback(Callback &&) = delete;
|
||||
Callback &operator=(Callback &&) = delete;
|
||||
~Callback() override {
|
||||
output_queue_->writer_put({client_id_, 0, nullptr});
|
||||
output_updates_queue_->writer_put({client_id_, 0, nullptr});
|
||||
}
|
||||
|
||||
private:
|
||||
MultiClient::ClientId client_id_;
|
||||
std::shared_ptr<OutputQueue> output_queue_;
|
||||
std::shared_ptr<OutputQueue> output_updates_queue_;
|
||||
std::shared_ptr<OutputQueue> output_responses_queue_;
|
||||
};
|
||||
return td::make_unique<Callback>(client_id, output_queue_);
|
||||
return td::make_unique<Callback>(client_id, output_updates_queue_, output_responses_queue_);
|
||||
}
|
||||
|
||||
private:
|
||||
using OutputQueue = MpscPollableQueue<MultiClient::Response>;
|
||||
std::shared_ptr<OutputQueue> output_queue_;
|
||||
int output_queue_ready_cnt_{0};
|
||||
std::shared_ptr<OutputQueue> output_updates_queue_;
|
||||
std::shared_ptr<OutputQueue> output_responses_queue_;
|
||||
int output_updates_queue_ready_cnt_{0};
|
||||
int output_responses_queue_ready_cnt_{0};
|
||||
std::atomic<bool> receive_lock_{false};
|
||||
|
||||
MultiClient::Response receive_unlocked(double timeout) {
|
||||
if (output_queue_ready_cnt_ == 0) {
|
||||
output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock();
|
||||
MultiClient::Response receive_unlocked(double timeout, bool include_responses, bool include_updates) {
|
||||
if (output_responses_queue_ready_cnt_ == 0) {
|
||||
output_responses_queue_ready_cnt_ = output_responses_queue_->reader_wait_nonblock();
|
||||
}
|
||||
if (output_queue_ready_cnt_ > 0) {
|
||||
output_queue_ready_cnt_--;
|
||||
return output_queue_->reader_get_unsafe();
|
||||
if (output_responses_queue_ready_cnt_ > 0) {
|
||||
output_responses_queue_ready_cnt_--;
|
||||
return output_responses_queue_->reader_get_unsafe();
|
||||
}
|
||||
if (output_updates_queue_ready_cnt_ == 0) {
|
||||
output_updates_queue_ready_cnt_ = output_updates_queue_->reader_wait_nonblock();
|
||||
}
|
||||
if (output_updates_queue_ready_cnt_ > 0) {
|
||||
output_updates_queue_ready_cnt_--;
|
||||
return output_updates_queue_->reader_get_unsafe();
|
||||
}
|
||||
if (timeout != 0) {
|
||||
output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
|
||||
return receive_unlocked(0);
|
||||
if (include_responses && !include_updates) {
|
||||
output_responses_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
|
||||
} else {
|
||||
output_updates_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
|
||||
}
|
||||
return receive_unlocked(0, include_responses, include_updates);
|
||||
}
|
||||
return {0, 0, nullptr};
|
||||
}
|
||||
@ -387,7 +425,11 @@ class MultiClient::Impl final {
|
||||
}
|
||||
|
||||
Response receive(double timeout) {
|
||||
auto res = receiver_->receive(timeout);
|
||||
return receive(timeout, true, true);
|
||||
}
|
||||
|
||||
Response receive(double timeout, bool include_responses, bool include_updates) {
|
||||
auto res = receiver_->receive(timeout, include_responses, include_updates);
|
||||
if (res.client_id != 0 && !res.object) {
|
||||
auto lock = impls_mutex_.lock_write().move_as_ok();
|
||||
impls_.erase(res.client_id);
|
||||
@ -405,7 +447,7 @@ class MultiClient::Impl final {
|
||||
it.second->close(it.first);
|
||||
}
|
||||
while (!impls_.empty()) {
|
||||
receive(10);
|
||||
receive(10, true, true);
|
||||
}
|
||||
}
|
||||
|
||||
@ -435,7 +477,11 @@ class Client::Impl final {
|
||||
}
|
||||
|
||||
Client::Response receive(double timeout) {
|
||||
auto res = receiver_->receive(timeout);
|
||||
return receive(timeout, true, true);
|
||||
}
|
||||
|
||||
Client::Response receive(double timeout, bool include_responses, bool include_updates) {
|
||||
auto res = receiver_->receive(timeout, include_responses, include_updates);
|
||||
|
||||
if (res.client_id != 0 && !res.object) {
|
||||
is_closed_ = true;
|
||||
@ -454,7 +500,7 @@ class Client::Impl final {
|
||||
~Impl() {
|
||||
multi_impl_->close(td_id_);
|
||||
while (!is_closed_) {
|
||||
receive(10);
|
||||
receive(10, true, true);
|
||||
}
|
||||
}
|
||||
|
||||
@ -478,6 +524,10 @@ Client::Response Client::receive(double timeout) {
|
||||
return impl_->receive(timeout);
|
||||
}
|
||||
|
||||
Client::Response Client::receive(double timeout, bool include_responses, bool include_updates) {
|
||||
return impl_->receive(timeout, include_responses, include_updates);
|
||||
}
|
||||
|
||||
Client::Response Client::execute(Request &&request) {
|
||||
Response response;
|
||||
response.id = request.id;
|
||||
@ -504,6 +554,10 @@ MultiClient::Response MultiClient::receive(double timeout) {
|
||||
return impl_->receive(timeout);
|
||||
}
|
||||
|
||||
MultiClient::Response MultiClient::receive(double timeout, bool include_responses, bool include_updates) {
|
||||
return impl_->receive(timeout, include_responses, include_updates);
|
||||
}
|
||||
|
||||
MultiClient::Object MultiClient::execute(Function &&function) {
|
||||
return Td::static_request(std::move(function));
|
||||
}
|
||||
|
@ -103,6 +103,17 @@ class Client final {
|
||||
*/
|
||||
Response receive(double timeout);
|
||||
|
||||
/**
|
||||
* Receives incoming updates and request responses from TDLib. May be called from any thread, but shouldn't be
|
||||
* called simultaneously from two different threads.
|
||||
* \param[in] timeout The maximum number of seconds allowed for this function to wait for new data.
|
||||
* \param[in] include_responses Include request responses from TDLib.
|
||||
* \param[in] include_responses Include updates from TDLib.
|
||||
* \return An incoming update or request response. The object returned in the response may be a nullptr
|
||||
* if the timeout expires.
|
||||
*/
|
||||
Response receive(double timeout, bool include_responses, bool include_updates);
|
||||
|
||||
/**
|
||||
* Synchronously executes TDLib requests. Only a few requests can be executed synchronously.
|
||||
* May be called from any thread.
|
||||
@ -152,6 +163,8 @@ class MultiClient final {
|
||||
|
||||
Response receive(double timeout);
|
||||
|
||||
Response receive(double timeout, bool include_responses, bool include_updates);
|
||||
|
||||
static Object execute(Function &&function);
|
||||
|
||||
~MultiClient();
|
||||
|
Loading…
x
Reference in New Issue
Block a user