MuliClient: quick fix of TdReceiver

GitOrigin-RevId: feae0c3caffb5ebb4ef0402dbb7e2f3a15e714be
This commit is contained in:
Arseny Smirnov 2020-07-30 17:38:36 +03:00
parent cacabaf6d1
commit 4635b7b791

View File

@ -227,18 +227,14 @@ class TdReceiver {
} }
MultiClient::Response receive(double timeout) { MultiClient::Response receive(double timeout) {
if (output_queue_ready_cnt_ == 0) { VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout;
output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock(); auto is_locked = receive_lock_.exchange(true);
} CHECK(!is_locked);
if (output_queue_ready_cnt_ > 0) { auto response = receive_unlocked(timeout);
output_queue_ready_cnt_--; is_locked = receive_lock_.exchange(false);
return output_queue_->reader_get_unsafe(); CHECK(is_locked);
} VLOG(td_requests) << "End to wait for updates, returning object " << response.id << ' ' << response.object.get();
if (timeout != 0) { return response;
output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
return receive(0);
}
return {0, 0, nullptr};
} }
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) { unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) {
@ -272,6 +268,22 @@ class TdReceiver {
using OutputQueue = MpscPollableQueue<MultiClient::Response>; using OutputQueue = MpscPollableQueue<MultiClient::Response>;
std::shared_ptr<OutputQueue> output_queue_; std::shared_ptr<OutputQueue> output_queue_;
int output_queue_ready_cnt_{0}; int output_queue_ready_cnt_{0};
std::atomic<bool> receive_lock_{false};
MultiClient::Response receive_unlocked(double timeout) {
if (output_queue_ready_cnt_ == 0) {
output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock();
}
if (output_queue_ready_cnt_ > 0) {
output_queue_ready_cnt_--;
return output_queue_->reader_get_unsafe();
}
if (timeout != 0) {
output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
return receive(0);
}
return {0, 0, nullptr};
}
}; };
class MultiImpl { class MultiImpl {
@ -435,7 +447,7 @@ class Client::Impl final {
} }
Client::Response receive(double timeout) { Client::Response receive(double timeout) {
auto res = receiver_->receive(0); auto res = receiver_->receive(timeout);
if (res.client_id != 0 && !res.object) { if (res.client_id != 0 && !res.object) {
is_closed_ = true; is_closed_ = true;