Revert standard client implementation
This commit is contained in:
parent
f5ecc4b896
commit
d3b2f81269
@ -20,8 +20,6 @@
|
|||||||
#include "td/utils/port/RwMutex.h"
|
#include "td/utils/port/RwMutex.h"
|
||||||
#include "td/utils/port/thread.h"
|
#include "td/utils/port/thread.h"
|
||||||
|
|
||||||
#include "td/utils/death_handler.h"
|
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
@ -35,20 +33,11 @@ namespace td {
|
|||||||
class TdReceiver {
|
class TdReceiver {
|
||||||
public:
|
public:
|
||||||
ClientManager::Response receive(double timeout) {
|
ClientManager::Response receive(double timeout) {
|
||||||
return receive(timeout, true, true);
|
if (!responses_.empty()) {
|
||||||
}
|
|
||||||
|
|
||||||
ClientManager::Response receive(double timeout, bool include_responses, bool include_updates) {
|
|
||||||
if (include_responses && !responses_.empty()) {
|
|
||||||
auto result = std::move(responses_.front());
|
auto result = std::move(responses_.front());
|
||||||
responses_.pop();
|
responses_.pop();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
if (include_updates && !updates_.empty()) {
|
|
||||||
auto result = std::move(updates_.front());
|
|
||||||
updates_.pop();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
return {0, 0, nullptr};
|
return {0, 0, nullptr};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,30 +47,17 @@ class TdReceiver {
|
|||||||
Callback(ClientManager::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) {
|
Callback(ClientManager::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) {
|
||||||
}
|
}
|
||||||
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
|
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
|
||||||
if (id == 0) {
|
impl_->responses_.push({client_id_, id, std::move(result)});
|
||||||
impl_->responses_.push({client_id_, id, nullptr});
|
|
||||||
impl_->updates_.push({client_id_, 0, std::move(result)});
|
|
||||||
} else {
|
|
||||||
impl_->responses_.push({client_id_, id, std::move(result)});
|
|
||||||
impl_->updates_.push({client_id_, id, nullptr});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
|
void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
|
||||||
if (id == 0) {
|
impl_->responses_.push({client_id_, id, std::move(error)});
|
||||||
impl_->responses_.push({client_id_, 0, nullptr});
|
|
||||||
impl_->updates_.push({client_id_, 0, std::move(error)});
|
|
||||||
} else {
|
|
||||||
impl_->responses_.push({client_id_, id, std::move(error)});
|
|
||||||
impl_->updates_.push({client_id_, id, nullptr});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Callback(const Callback &) = delete;
|
Callback(const Callback &) = delete;
|
||||||
Callback &operator=(const Callback &) = delete;
|
Callback &operator=(const Callback &) = delete;
|
||||||
Callback(Callback &&) = delete;
|
Callback(Callback &&) = delete;
|
||||||
Callback &operator=(Callback &&) = delete;
|
Callback &operator=(Callback &&) = delete;
|
||||||
~Callback() override {
|
~Callback() override {
|
||||||
//impl_->responses_.push({0, 0, nullptr});
|
impl_->responses_.push({client_id_, 0, nullptr});
|
||||||
impl_->updates_.push({client_id_, 0, nullptr});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -96,7 +72,6 @@ class TdReceiver {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::queue<ClientManager::Response> updates_;
|
|
||||||
std::queue<ClientManager::Response> responses_;
|
std::queue<ClientManager::Response> responses_;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -122,10 +97,6 @@ class ClientManager::Impl final {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Response receive(double timeout) {
|
Response receive(double timeout) {
|
||||||
return receive(timeout, true, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
Response receive(double timeout, bool include_responses, bool include_updates) {
|
|
||||||
if (!requests_.empty()) {
|
if (!requests_.empty()) {
|
||||||
for (size_t i = 0; i < requests_.size(); i++) {
|
for (size_t i = 0; i < requests_.size(); i++) {
|
||||||
auto &request = requests_[i];
|
auto &request = requests_[i];
|
||||||
@ -148,10 +119,10 @@ class ClientManager::Impl final {
|
|||||||
requests_.clear();
|
requests_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto response = receiver_.receive(0, include_responses, include_updates);
|
auto response = receiver_.receive(0);
|
||||||
if (response.client_id == 0 && concurrent_scheduler_ != nullptr) {
|
if (response.client_id == 0 && concurrent_scheduler_ != nullptr) {
|
||||||
concurrent_scheduler_->run_main(0);
|
concurrent_scheduler_->run_main(0);
|
||||||
response = receiver_.receive(0, include_responses, include_updates);
|
response = receiver_.receive(0);
|
||||||
} else {
|
} else {
|
||||||
ConcurrentScheduler::emscripten_clear_main_timeout();
|
ConcurrentScheduler::emscripten_clear_main_timeout();
|
||||||
}
|
}
|
||||||
@ -205,7 +176,7 @@ class ClientManager::Impl final {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (!tds_.empty() && !ExitGuard::is_exited()) {
|
while (!tds_.empty() && !ExitGuard::is_exited()) {
|
||||||
receive(0.1, false, true);
|
receive(0.1);
|
||||||
}
|
}
|
||||||
if (!ExitGuard::is_exited()) { // prevent closing of schedulers from already killed by OS threads
|
if (!ExitGuard::is_exited()) { // prevent closing of schedulers from already killed by OS threads
|
||||||
concurrent_scheduler_->finish();
|
concurrent_scheduler_->finish();
|
||||||
@ -236,11 +207,8 @@ class Client::Impl final {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Response receive(double timeout) {
|
Response receive(double timeout) {
|
||||||
return receive(timeout, true, true);
|
auto response = impl_.receive(timeout);
|
||||||
}
|
|
||||||
|
|
||||||
Response receive(double timeout, bool include_responses, bool include_updates) {
|
|
||||||
auto response = impl_.receive(timeout, include_responses, include_updates);
|
|
||||||
Response old_response;
|
Response old_response;
|
||||||
old_response.id = response.request_id;
|
old_response.id = response.request_id;
|
||||||
old_response.object = std::move(response.object);
|
old_response.object = std::move(response.object);
|
||||||
@ -291,41 +259,19 @@ class MultiTd : public Actor {
|
|||||||
class TdReceiver {
|
class TdReceiver {
|
||||||
public:
|
public:
|
||||||
TdReceiver() {
|
TdReceiver() {
|
||||||
output_responses_queue_ = std::make_shared<OutputQueue>();
|
output_queue_ = std::make_shared<OutputQueue>();
|
||||||
output_responses_queue_->init();
|
output_queue_->init();
|
||||||
output_updates_queue_ = std::make_shared<OutputQueue>();
|
|
||||||
output_updates_queue_->init();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientManager::Response receive(double timeout) {
|
ClientManager::Response receive(double timeout) {
|
||||||
return receive(timeout, true, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
ClientManager::Response receive(double timeout, bool include_responses, bool include_updates) {
|
|
||||||
VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout;
|
VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout;
|
||||||
bool is_responses_locked = false;
|
auto is_locked = receive_lock_.exchange(true);
|
||||||
bool is_updates_locked = false;
|
if (is_locked) {
|
||||||
if (include_responses) {
|
LOG(FATAL) << "Receive is called after Client destroy, or simultaneously from different threads";
|
||||||
is_responses_locked = receive_responses_lock_.exchange(true);
|
|
||||||
if (is_responses_locked) {
|
|
||||||
LOG(FATAL) << "Receive is called after Client destroy, or simultaneously from different threads";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (include_updates) {
|
|
||||||
is_updates_locked = receive_updates_lock_.exchange(true);
|
|
||||||
if (is_updates_locked) {
|
|
||||||
LOG(FATAL) << "Receive is called after Client destroy, or simultaneously from different threads";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
auto response = receive_unlocked(timeout, include_responses, include_updates);
|
|
||||||
if (include_updates) {
|
|
||||||
is_updates_locked = receive_updates_lock_.exchange(false);
|
|
||||||
CHECK(is_updates_locked);
|
|
||||||
}
|
|
||||||
if (include_responses) {
|
|
||||||
is_responses_locked = receive_responses_lock_.exchange(false);
|
|
||||||
CHECK(is_responses_locked);
|
|
||||||
}
|
}
|
||||||
|
auto response = receive_unlocked(timeout);
|
||||||
|
is_locked = receive_lock_.exchange(false);
|
||||||
|
CHECK(is_locked);
|
||||||
VLOG(td_requests) << "End to wait for updates, returning object " << response.request_id << ' '
|
VLOG(td_requests) << "End to wait for updates, returning object " << response.request_id << ' '
|
||||||
<< response.object.get();
|
<< response.object.get();
|
||||||
return response;
|
return response;
|
||||||
@ -334,93 +280,51 @@ class TdReceiver {
|
|||||||
unique_ptr<TdCallback> create_callback(ClientManager::ClientId client_id) {
|
unique_ptr<TdCallback> create_callback(ClientManager::ClientId client_id) {
|
||||||
class Callback : public TdCallback {
|
class Callback : public TdCallback {
|
||||||
public:
|
public:
|
||||||
explicit Callback(ClientManager::ClientId client_id, std::shared_ptr<OutputQueue> output_responses_queue, std::shared_ptr<OutputQueue> output_updates_queue)
|
explicit Callback(ClientManager::ClientId client_id, std::shared_ptr<OutputQueue> output_queue)
|
||||||
: client_id_(client_id), output_responses_queue_(std::move(output_responses_queue)), output_updates_queue_(std::move(output_updates_queue)) {
|
: client_id_(client_id), output_queue_(std::move(output_queue)) {
|
||||||
}
|
}
|
||||||
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
|
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
|
||||||
if (id == 0) {
|
output_queue_->writer_put({client_id_, id, std::move(result)});
|
||||||
output_responses_queue_->writer_put({0, 0, nullptr});
|
|
||||||
output_updates_queue_->writer_put({client_id_, 0, std::move(result)});
|
|
||||||
} else {
|
|
||||||
output_responses_queue_->writer_put({client_id_, id, std::move(result)});
|
|
||||||
output_updates_queue_->writer_put({0, 0, nullptr});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
|
void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
|
||||||
if (id == 0) {
|
output_queue_->writer_put({client_id_, id, std::move(error)});
|
||||||
output_responses_queue_->writer_put({0, 0, nullptr});
|
|
||||||
output_updates_queue_->writer_put({client_id_, 0, std::move(error)});
|
|
||||||
} else {
|
|
||||||
output_responses_queue_->writer_put({client_id_, id, std::move(error)});
|
|
||||||
output_updates_queue_->writer_put({0, 0, nullptr});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Callback(const Callback &) = delete;
|
Callback(const Callback &) = delete;
|
||||||
Callback &operator=(const Callback &) = delete;
|
Callback &operator=(const Callback &) = delete;
|
||||||
Callback(Callback &&) = delete;
|
Callback(Callback &&) = delete;
|
||||||
Callback &operator=(Callback &&) = delete;
|
Callback &operator=(Callback &&) = delete;
|
||||||
~Callback() override {
|
~Callback() override {
|
||||||
//output_responses_queue_->writer_put({0, 0, nullptr});
|
output_queue_->writer_put({client_id_, 0, nullptr});
|
||||||
output_updates_queue_->writer_put({client_id_, 0, nullptr});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ClientManager::ClientId client_id_;
|
ClientManager::ClientId client_id_;
|
||||||
std::shared_ptr<OutputQueue> output_responses_queue_;
|
std::shared_ptr<OutputQueue> output_queue_;
|
||||||
std::shared_ptr<OutputQueue> output_updates_queue_;
|
|
||||||
};
|
};
|
||||||
return td::make_unique<Callback>(client_id, output_responses_queue_, output_updates_queue_);
|
return td::make_unique<Callback>(client_id, output_queue_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void add_response(ClientManager::ClientId client_id, uint64 id, td_api::object_ptr<td_api::Object> result) {
|
void add_response(ClientManager::ClientId client_id, uint64 id, td_api::object_ptr<td_api::Object> result) {
|
||||||
if (id == 0) {
|
output_queue_->writer_put({client_id, id, std::move(result)});
|
||||||
output_responses_queue_->writer_put({0, 0, nullptr});
|
|
||||||
output_updates_queue_->writer_put({client_id, id, std::move(result)});
|
|
||||||
} else {
|
|
||||||
output_responses_queue_->writer_put({client_id, id, std::move(result)});
|
|
||||||
output_updates_queue_->writer_put({0, 0, nullptr});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using OutputQueue = MpscPollableQueue<ClientManager::Response>;
|
using OutputQueue = MpscPollableQueue<ClientManager::Response>;
|
||||||
std::shared_ptr<OutputQueue> output_responses_queue_;
|
std::shared_ptr<OutputQueue> output_queue_;
|
||||||
std::shared_ptr<OutputQueue> output_updates_queue_;
|
int output_queue_ready_cnt_{0};
|
||||||
int output_responses_queue_ready_cnt_{0};
|
std::atomic<bool> receive_lock_{false};
|
||||||
int output_updates_queue_ready_cnt_{0};
|
|
||||||
std::atomic<bool> receive_responses_lock_{false};
|
|
||||||
std::atomic<bool> receive_updates_lock_{false};
|
|
||||||
|
|
||||||
ClientManager::Response receive_unlocked(double timeout, bool include_responses, bool include_updates) {
|
ClientManager::Response receive_unlocked(double timeout) {
|
||||||
if (include_responses) {
|
if (output_queue_ready_cnt_ == 0) {
|
||||||
if (output_responses_queue_ready_cnt_ == 0) {
|
output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock();
|
||||||
output_responses_queue_ready_cnt_ = output_responses_queue_->reader_wait_nonblock();
|
|
||||||
}
|
|
||||||
if (output_responses_queue_ready_cnt_ > 0) {
|
|
||||||
output_responses_queue_ready_cnt_--;
|
|
||||||
return output_responses_queue_->reader_get_unsafe();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (include_updates) {
|
if (output_queue_ready_cnt_ > 0) {
|
||||||
if (output_updates_queue_ready_cnt_ == 0) {
|
output_queue_ready_cnt_--;
|
||||||
output_updates_queue_ready_cnt_ = output_updates_queue_->reader_wait_nonblock();
|
return output_queue_->reader_get_unsafe();
|
||||||
}
|
|
||||||
if (output_updates_queue_ready_cnt_ > 0) {
|
|
||||||
output_updates_queue_ready_cnt_--;
|
|
||||||
return output_updates_queue_->reader_get_unsafe();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (timeout != 0) {
|
if (timeout != 0) {
|
||||||
if (include_responses && !include_updates) {
|
output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
|
||||||
output_responses_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
|
return receive_unlocked(0);
|
||||||
} else if (!include_responses && include_updates) {
|
|
||||||
output_updates_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
|
|
||||||
} else if (include_responses && include_updates) {
|
|
||||||
output_updates_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
|
|
||||||
} else {
|
|
||||||
// This configuration (include_responses = false and include_updates = false) shouldn't be used.
|
|
||||||
}
|
|
||||||
return receive_unlocked(0, include_responses, include_updates);
|
|
||||||
}
|
}
|
||||||
return {0, 0, nullptr};
|
return {0, 0, nullptr};
|
||||||
}
|
}
|
||||||
@ -575,15 +479,11 @@ class ClientManager::Impl final {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Response receive(double timeout) {
|
Response receive(double timeout) {
|
||||||
return receive(timeout, true, true);
|
auto response = receiver_.receive(timeout);
|
||||||
}
|
|
||||||
|
|
||||||
Response receive(double timeout, bool include_responses, bool include_updates) {
|
|
||||||
auto response = receiver_.receive(timeout, include_responses, include_updates);
|
|
||||||
if (response.request_id == 0 && response.object != nullptr &&
|
if (response.request_id == 0 && response.object != nullptr &&
|
||||||
response.object->get_id() == td_api::updateAuthorizationState::ID &&
|
response.object->get_id() == td_api::updateAuthorizationState::ID &&
|
||||||
static_cast<const td_api::updateAuthorizationState *>(response.object.get())->authorization_state_->get_id() ==
|
static_cast<const td_api::updateAuthorizationState *>(response.object.get())->authorization_state_->get_id() ==
|
||||||
td_api::authorizationStateClosed::ID) {
|
td_api::authorizationStateClosed::ID) {
|
||||||
auto lock = impls_mutex_.lock_write().move_as_ok();
|
auto lock = impls_mutex_.lock_write().move_as_ok();
|
||||||
close_impl(response.client_id);
|
close_impl(response.client_id);
|
||||||
|
|
||||||
@ -630,7 +530,7 @@ class ClientManager::Impl final {
|
|||||||
close_impl(it.first);
|
close_impl(it.first);
|
||||||
}
|
}
|
||||||
while (!impls_.empty() && !ExitGuard::is_exited()) {
|
while (!impls_.empty() && !ExitGuard::is_exited()) {
|
||||||
receive(0.1, false, true);
|
receive(0.1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -663,11 +563,7 @@ class Client::Impl final {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Response receive(double timeout) {
|
Response receive(double timeout) {
|
||||||
return receive(timeout, true, true);
|
auto response = receiver_.receive(timeout);
|
||||||
}
|
|
||||||
|
|
||||||
Response receive(double timeout, bool include_responses, bool include_updates) {
|
|
||||||
auto response = receiver_.receive(timeout, include_responses, include_updates);
|
|
||||||
|
|
||||||
Response old_response;
|
Response old_response;
|
||||||
old_response.id = response.request_id;
|
old_response.id = response.request_id;
|
||||||
@ -682,7 +578,7 @@ class Client::Impl final {
|
|||||||
~Impl() {
|
~Impl() {
|
||||||
multi_impl_->close(td_id_);
|
multi_impl_->close(td_id_);
|
||||||
while (!ExitGuard::is_exited()) {
|
while (!ExitGuard::is_exited()) {
|
||||||
auto response = receiver_.receive(0.1, false, true);
|
auto response = receiver_.receive(0.1);
|
||||||
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
|
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -708,10 +604,6 @@ Client::Response Client::receive(double timeout) {
|
|||||||
return impl_->receive(timeout);
|
return impl_->receive(timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
Client::Response Client::receive(double timeout, bool include_responses, bool include_updates) {
|
|
||||||
return impl_->receive(timeout, include_responses, include_updates);
|
|
||||||
}
|
|
||||||
|
|
||||||
Client::Response Client::execute(Request &&request) {
|
Client::Response Client::execute(Request &&request) {
|
||||||
Response response;
|
Response response;
|
||||||
response.id = request.id;
|
response.id = request.id;
|
||||||
@ -738,10 +630,6 @@ ClientManager::Response ClientManager::receive(double timeout) {
|
|||||||
return impl_->receive(timeout);
|
return impl_->receive(timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientManager::Response ClientManager::receive(double timeout, bool include_responses, bool include_updates) {
|
|
||||||
return impl_->receive(timeout, include_responses, include_updates);
|
|
||||||
}
|
|
||||||
|
|
||||||
td_api::object_ptr<td_api::Object> ClientManager::execute(td_api::object_ptr<td_api::Function> &&request) {
|
td_api::object_ptr<td_api::Object> ClientManager::execute(td_api::object_ptr<td_api::Function> &&request) {
|
||||||
return Td::static_request(std::move(request));
|
return Td::static_request(std::move(request));
|
||||||
}
|
}
|
||||||
|
@ -103,17 +103,6 @@ class Client final {
|
|||||||
*/
|
*/
|
||||||
Response receive(double timeout);
|
Response receive(double timeout);
|
||||||
|
|
||||||
/**
|
|
||||||
* Receives incoming updates and request responses from TDLib. May be called from any thread, but shouldn't be
|
|
||||||
* called simultaneously from two different threads.
|
|
||||||
* \param[in] timeout The maximum number of seconds allowed for this function to wait for new data.
|
|
||||||
* \param[in] include_responses Include request responses from TDLib.
|
|
||||||
* \param[in] include_responses Include updates from TDLib.
|
|
||||||
* \return An incoming update or request response. The object returned in the response may be a nullptr
|
|
||||||
* if the timeout expires.
|
|
||||||
*/
|
|
||||||
Response receive(double timeout, bool include_responses, bool include_updates);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Synchronously executes TDLib requests. Only a few requests can be executed synchronously.
|
* Synchronously executes TDLib requests. Only a few requests can be executed synchronously.
|
||||||
* May be called from any thread.
|
* May be called from any thread.
|
||||||
@ -235,8 +224,6 @@ class ClientManager final {
|
|||||||
*/
|
*/
|
||||||
Response receive(double timeout);
|
Response receive(double timeout);
|
||||||
|
|
||||||
Response receive(double timeout, bool include_responses, bool include_updates);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Synchronously executes TDLib requests. Only a few requests can be executed synchronously.
|
* Synchronously executes TDLib requests. Only a few requests can be executed synchronously.
|
||||||
* May be called from any thread.
|
* May be called from any thread.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user