MultiClient: draft
GitOrigin-RevId: 4d1bdd6ad99909ce7ad94cfd32a43262051a6d18
This commit is contained in:
parent
2e0d642a98
commit
b1222a9bb7
@ -15,6 +15,7 @@
|
|||||||
#include "td/utils/logging.h"
|
#include "td/utils/logging.h"
|
||||||
#include "td/utils/misc.h"
|
#include "td/utils/misc.h"
|
||||||
#include "td/utils/MpscPollableQueue.h"
|
#include "td/utils/MpscPollableQueue.h"
|
||||||
|
#include "td/utils/port/RwMutex.h"
|
||||||
#include "td/utils/port/thread.h"
|
#include "td/utils/port/thread.h"
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
@ -26,91 +27,13 @@
|
|||||||
|
|
||||||
namespace td {
|
namespace td {
|
||||||
|
|
||||||
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
|
class TdReceiver {
|
||||||
|
|
||||||
class Client::Impl final {
|
|
||||||
public:
|
public:
|
||||||
Impl() {
|
virtual ~TdReceiver() = default;
|
||||||
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
|
virtual MultiClient::Response receive(double timeout) = 0;
|
||||||
concurrent_scheduler_->init(0);
|
virtual unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) = 0;
|
||||||
class Callback : public TdCallback {
|
|
||||||
public:
|
|
||||||
explicit Callback(Impl *client) : client_(client) {
|
|
||||||
}
|
|
||||||
void on_result(std::uint64_t id, td_api::object_ptr<td_api::Object> result) override {
|
|
||||||
client_->responses_.push_back({id, std::move(result)});
|
|
||||||
}
|
|
||||||
void on_error(std::uint64_t id, td_api::object_ptr<td_api::error> error) override {
|
|
||||||
client_->responses_.push_back({id, std::move(error)});
|
|
||||||
}
|
|
||||||
|
|
||||||
Callback(const Callback &) = delete;
|
|
||||||
Callback &operator=(const Callback &) = delete;
|
|
||||||
Callback(Callback &&) = delete;
|
|
||||||
Callback &operator=(Callback &&) = delete;
|
|
||||||
~Callback() override {
|
|
||||||
client_->closed_ = true;
|
|
||||||
Scheduler::instance()->yield();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
Impl *client_;
|
|
||||||
};
|
|
||||||
td_ = concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", make_unique<Callback>(this));
|
|
||||||
concurrent_scheduler_->start();
|
|
||||||
}
|
|
||||||
|
|
||||||
void send(Request request) {
|
|
||||||
requests_.push_back(std::move(request));
|
|
||||||
}
|
|
||||||
|
|
||||||
Response receive(double timeout) {
|
|
||||||
if (!requests_.empty()) {
|
|
||||||
auto guard = concurrent_scheduler_->get_main_guard();
|
|
||||||
for (auto &request : requests_) {
|
|
||||||
send_closure_later(td_, &Td::request, request.id, std::move(request.function));
|
|
||||||
}
|
|
||||||
requests_.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (responses_.empty()) {
|
|
||||||
concurrent_scheduler_->run_main(0);
|
|
||||||
} else {
|
|
||||||
ConcurrentScheduler::emscripten_clear_main_timeout();
|
|
||||||
}
|
|
||||||
if (!responses_.empty()) {
|
|
||||||
auto result = std::move(responses_.front());
|
|
||||||
responses_.pop_front();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
return {0, nullptr};
|
|
||||||
}
|
|
||||||
|
|
||||||
Impl(const Impl &) = delete;
|
|
||||||
Impl &operator=(const Impl &) = delete;
|
|
||||||
Impl(Impl &&) = delete;
|
|
||||||
Impl &operator=(Impl &&) = delete;
|
|
||||||
~Impl() {
|
|
||||||
{
|
|
||||||
auto guard = concurrent_scheduler_->get_main_guard();
|
|
||||||
td_.reset();
|
|
||||||
}
|
|
||||||
while (!closed_) {
|
|
||||||
concurrent_scheduler_->run_main(0);
|
|
||||||
}
|
|
||||||
concurrent_scheduler_.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::deque<Response> responses_;
|
|
||||||
std::vector<Request> requests_;
|
|
||||||
unique_ptr<ConcurrentScheduler> concurrent_scheduler_;
|
|
||||||
ActorOwn<Td> td_;
|
|
||||||
bool closed_ = false;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#else
|
|
||||||
|
|
||||||
class MultiTd : public Actor {
|
class MultiTd : public Actor {
|
||||||
public:
|
public:
|
||||||
void create(int32 td_id, unique_ptr<TdCallback> callback) {
|
void create(int32 td_id, unique_ptr<TdCallback> callback) {
|
||||||
@ -134,10 +57,10 @@ class MultiTd : public Actor {
|
|||||||
set_context(old_context);
|
set_context(old_context);
|
||||||
set_tag(old_tag);
|
set_tag(old_tag);
|
||||||
}
|
}
|
||||||
void send(int32 td_id, Client::Request request) {
|
void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) {
|
||||||
auto &td = tds_[td_id];
|
auto &td = tds_[client_id];
|
||||||
CHECK(!td.empty());
|
CHECK(!td.empty());
|
||||||
send_closure(td, &Td::request, request.id, std::move(request.function));
|
send_closure(td, &Td::request, request_id, std::move(function));
|
||||||
}
|
}
|
||||||
void destroy(int32 td_id) {
|
void destroy(int32 td_id) {
|
||||||
auto size = tds_.erase(td_id);
|
auto size = tds_.erase(td_id);
|
||||||
@ -148,25 +71,213 @@ class MultiTd : public Actor {
|
|||||||
std::unordered_map<int32, ActorOwn<Td> > tds_;
|
std::unordered_map<int32, ActorOwn<Td> > tds_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class MultiImpl {
|
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
|
||||||
|
class TdReceiverSimple : public TdReceiver {
|
||||||
public:
|
public:
|
||||||
static std::shared_ptr<MultiImpl> get() {
|
TdReceiverSimple() {
|
||||||
static std::mutex mutex;
|
}
|
||||||
static std::vector<std::weak_ptr<MultiImpl> > impls;
|
MultiClient::Response receive(double timeout) final {
|
||||||
std::unique_lock<std::mutex> lock(mutex);
|
if (!responses_.empty()) {
|
||||||
if (impls.size() == 0) {
|
auto result = std::move(responses_.front());
|
||||||
impls.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4);
|
responses_.pop_front();
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
auto &impl = *std::min_element(impls.begin(), impls.end(),
|
return {0, 0, nullptr};
|
||||||
[](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); });
|
}
|
||||||
auto res = impl.lock();
|
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) final {
|
||||||
if (!res) {
|
class Callback : public TdCallback {
|
||||||
res = std::make_shared<MultiImpl>();
|
public:
|
||||||
impl = res;
|
explicit Callback(MultiClient::ClientId client_id, TdReceiverSimple *impl) : client_id_(client_id), impl_(impl) {
|
||||||
}
|
}
|
||||||
return res;
|
void on_result(std::uint64_t id, td_api::object_ptr<td_api::Object> result) override {
|
||||||
|
impl_->responses_.push_back({client_id_, id, std::move(result)});
|
||||||
|
}
|
||||||
|
void on_error(std::uint64_t id, td_api::object_ptr<td_api::error> error) override {
|
||||||
|
impl_->responses_.push_back({client_id_, id, std::move(error)});
|
||||||
|
}
|
||||||
|
Callback(const Callback &) = delete;
|
||||||
|
Callback &operator=(const Callback &) = delete;
|
||||||
|
Callback(Callback &&) = delete;
|
||||||
|
Callback &operator=(Callback &&) = delete;
|
||||||
|
~Callback() override {
|
||||||
|
impl_->responses_.push_back({client_id_, 0, nullptr});
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
MultiClient::ClientId client_id_;
|
||||||
|
TdReceiverSimple *impl_;
|
||||||
|
};
|
||||||
|
return td::make_unique<Callback>(client_id, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::deque<MultiClient::Response> responses_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class MultiClient::Impl final {
|
||||||
|
public:
|
||||||
|
Impl() {
|
||||||
|
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
|
||||||
|
concurrent_scheduler_->init(0);
|
||||||
|
receiver_ = make_unique<TdReceiverSimple>();
|
||||||
|
concurrent_scheduler_->start();
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientId create_client() {
|
||||||
|
auto client_id = ++client_id_;
|
||||||
|
tds_[client_id] = concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_->create_callback(client_id));
|
||||||
|
return client_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
void send(ClientId client_id, RequestId request_id, Function function) {
|
||||||
|
Request request;
|
||||||
|
request.client_id = client_id;
|
||||||
|
request.id = request_id;
|
||||||
|
request.function = std::move(function);
|
||||||
|
requests_.push_back(std::move(request));
|
||||||
|
}
|
||||||
|
|
||||||
|
Response receive(double timeout) {
|
||||||
|
if (!requests_.empty()) {
|
||||||
|
auto guard = concurrent_scheduler_->get_main_guard();
|
||||||
|
for (auto &request : requests_) {
|
||||||
|
auto &td = tds_[request.client_id];
|
||||||
|
CHECK(!td.empty());
|
||||||
|
send_closure_later(td, &Td::request, request.id, std::move(request.function));
|
||||||
|
}
|
||||||
|
requests_.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto response = receiver_->receive(0);
|
||||||
|
if (response.client_id == 0) {
|
||||||
|
concurrent_scheduler_->run_main(0);
|
||||||
|
response = receiver_->receive(0);
|
||||||
|
} else {
|
||||||
|
ConcurrentScheduler::emscripten_clear_main_timeout();
|
||||||
|
}
|
||||||
|
if (response.client_id != 0 && !response.object) {
|
||||||
|
auto guard = concurrent_scheduler_->get_main_guard();
|
||||||
|
tds_.erase(response.client_id);
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
static Object execute(Function &&function) {
|
||||||
|
return Td::static_request(std::move(function));
|
||||||
|
}
|
||||||
|
|
||||||
|
~Impl() {
|
||||||
|
{
|
||||||
|
auto guard = concurrent_scheduler_->get_main_guard();
|
||||||
|
for (auto &td : tds_) {
|
||||||
|
td.second = {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (!tds_.empty()) {
|
||||||
|
receive(10);
|
||||||
|
}
|
||||||
|
concurrent_scheduler_->finish();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
friend class Client::Impl;
|
||||||
|
td::unique_ptr<TdReceiver> receiver_;
|
||||||
|
struct Request {
|
||||||
|
ClientId client_id;
|
||||||
|
RequestId id;
|
||||||
|
Function function;
|
||||||
|
};
|
||||||
|
std::vector<Request> requests_;
|
||||||
|
unique_ptr<ConcurrentScheduler> concurrent_scheduler_;
|
||||||
|
ClientId client_id_{0};
|
||||||
|
std::unordered_map<int32, ActorOwn<Td> > tds_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Client::Impl final {
|
||||||
|
public:
|
||||||
|
Impl() {
|
||||||
|
client_id_ = impl_.create_client();
|
||||||
|
}
|
||||||
|
|
||||||
|
void send(Request request) {
|
||||||
|
impl_.send(client_id_, request.id, std::move(request.function));
|
||||||
|
}
|
||||||
|
|
||||||
|
Response receive(double timeout) {
|
||||||
|
auto response = impl_.receive(timeout);
|
||||||
|
Response old_response;
|
||||||
|
old_response.id = response.id;
|
||||||
|
old_response.object = std::move(response.object);
|
||||||
|
return old_response;
|
||||||
|
}
|
||||||
|
|
||||||
|
Impl(const Impl &) = delete;
|
||||||
|
Impl &operator=(const Impl &) = delete;
|
||||||
|
Impl(Impl &&) = delete;
|
||||||
|
Impl &operator=(Impl &&) = delete;
|
||||||
|
|
||||||
|
private:
|
||||||
|
MultiClient::Impl impl_;
|
||||||
|
MultiClient::ClientId client_id_;
|
||||||
|
};
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
class TdReceiverTs : public TdReceiver {
|
||||||
|
public:
|
||||||
|
TdReceiverTs() {
|
||||||
|
output_queue_ = std::make_shared<OutputQueue>();
|
||||||
|
output_queue_->init();
|
||||||
|
}
|
||||||
|
MultiClient::Response receive(double timeout) final {
|
||||||
|
if (output_queue_ready_cnt_ == 0) {
|
||||||
|
output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock();
|
||||||
|
}
|
||||||
|
if (output_queue_ready_cnt_ > 0) {
|
||||||
|
output_queue_ready_cnt_--;
|
||||||
|
return output_queue_->reader_get_unsafe();
|
||||||
|
}
|
||||||
|
if (timeout != 0) {
|
||||||
|
output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
|
||||||
|
return receive(0);
|
||||||
|
}
|
||||||
|
return {0, 0, nullptr};
|
||||||
|
}
|
||||||
|
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) final {
|
||||||
|
class Callback : public TdCallback {
|
||||||
|
public:
|
||||||
|
explicit Callback(MultiClient::ClientId client_id, std::shared_ptr<OutputQueue> output_queue)
|
||||||
|
: client_id_(client_id), output_queue_(std::move(output_queue)) {
|
||||||
|
}
|
||||||
|
void on_result(std::uint64_t id, td_api::object_ptr<td_api::Object> result) override {
|
||||||
|
output_queue_->writer_put({client_id_, id, std::move(result)});
|
||||||
|
}
|
||||||
|
void on_error(std::uint64_t id, td_api::object_ptr<td_api::error> error) override {
|
||||||
|
output_queue_->writer_put({client_id_, id, std::move(error)});
|
||||||
|
}
|
||||||
|
Callback(const Callback &) = delete;
|
||||||
|
Callback &operator=(const Callback &) = delete;
|
||||||
|
Callback(Callback &&) = delete;
|
||||||
|
Callback &operator=(Callback &&) = delete;
|
||||||
|
~Callback() override {
|
||||||
|
output_queue_->writer_put({client_id_, 0, nullptr});
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
MultiClient::ClientId client_id_;
|
||||||
|
std::shared_ptr<OutputQueue> output_queue_;
|
||||||
|
};
|
||||||
|
return td::make_unique<Callback>(client_id, output_queue_);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
using OutputQueue = MpscPollableQueue<MultiClient::Response>;
|
||||||
|
std::shared_ptr<OutputQueue> output_queue_;
|
||||||
|
int output_queue_ready_cnt_{0};
|
||||||
|
};
|
||||||
|
|
||||||
|
class MultiImpl {
|
||||||
|
public:
|
||||||
MultiImpl() {
|
MultiImpl() {
|
||||||
concurrent_scheduler_ = std::make_shared<ConcurrentScheduler>();
|
concurrent_scheduler_ = std::make_shared<ConcurrentScheduler>();
|
||||||
concurrent_scheduler_->init(3);
|
concurrent_scheduler_->init(3);
|
||||||
@ -187,19 +298,19 @@ class MultiImpl {
|
|||||||
MultiImpl(MultiImpl &&) = delete;
|
MultiImpl(MultiImpl &&) = delete;
|
||||||
MultiImpl &operator=(MultiImpl &&) = delete;
|
MultiImpl &operator=(MultiImpl &&) = delete;
|
||||||
|
|
||||||
int32 create_id() {
|
int32 create(TdReceiver &receiver) {
|
||||||
static std::atomic<int32> id_{0};
|
auto id = create_id();
|
||||||
return id_.fetch_add(1) + 1;
|
create(id, receiver.create_callback(id));
|
||||||
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
void create(int32 td_id, unique_ptr<TdCallback> callback) {
|
[[deprecated]] void send(int32 td_id, Client::Request request) {
|
||||||
auto guard = concurrent_scheduler_->get_send_guard();
|
auto guard = concurrent_scheduler_->get_send_guard();
|
||||||
send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback));
|
send_closure(multi_td_, &MultiTd::send, td_id, request.id, std::move(request.function));
|
||||||
}
|
}
|
||||||
|
void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) {
|
||||||
void send(int32 td_id, Client::Request request) {
|
|
||||||
auto guard = concurrent_scheduler_->get_send_guard();
|
auto guard = concurrent_scheduler_->get_send_guard();
|
||||||
send_closure(multi_td_, &MultiTd::send, td_id, std::move(request));
|
send_closure(multi_td_, &MultiTd::send, client_id, request_id, std::move(function));
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroy(int32 td_id) {
|
void destroy(int32 td_id) {
|
||||||
@ -221,40 +332,92 @@ class MultiImpl {
|
|||||||
std::shared_ptr<ConcurrentScheduler> concurrent_scheduler_;
|
std::shared_ptr<ConcurrentScheduler> concurrent_scheduler_;
|
||||||
thread scheduler_thread_;
|
thread scheduler_thread_;
|
||||||
ActorOwn<MultiTd> multi_td_;
|
ActorOwn<MultiTd> multi_td_;
|
||||||
|
|
||||||
|
int32 create_id() {
|
||||||
|
static std::atomic<int32> id_{0};
|
||||||
|
return id_.fetch_add(1) + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void create(int32 td_id, unique_ptr<TdCallback> callback) {
|
||||||
|
auto guard = concurrent_scheduler_->get_send_guard();
|
||||||
|
send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class MultiImplPool {
|
||||||
|
public:
|
||||||
|
std::shared_ptr<MultiImpl> get() {
|
||||||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
|
if (impls_.size() == 0) {
|
||||||
|
impls_.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4);
|
||||||
|
}
|
||||||
|
auto &impl = *std::min_element(impls_.begin(), impls_.end(),
|
||||||
|
[](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); });
|
||||||
|
auto res = impl.lock();
|
||||||
|
if (!res) {
|
||||||
|
res = std::make_shared<MultiImpl>();
|
||||||
|
impl = res;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::mutex mutex_;
|
||||||
|
std::vector<std::weak_ptr<MultiImpl> > impls_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class MultiClient::Impl final {
|
||||||
|
public:
|
||||||
|
ClientId create_client() {
|
||||||
|
auto impl = pool_.get();
|
||||||
|
auto client_id = impl->create(*receiver_);
|
||||||
|
{
|
||||||
|
auto lock = impls_mutex_.lock_write().move_as_ok();
|
||||||
|
impls_[client_id] = std::move(impl);
|
||||||
|
}
|
||||||
|
return client_id;
|
||||||
|
}
|
||||||
|
void send(ClientId client_id, RequestId request_id, Function function) {
|
||||||
|
auto lock = impls_mutex_.lock_read().move_as_ok();
|
||||||
|
auto it = impls_.find(client_id);
|
||||||
|
CHECK(it != impls_.end());
|
||||||
|
it->second->send(client_id, request_id, std::move(function));
|
||||||
|
}
|
||||||
|
Response receive(double timeout) {
|
||||||
|
auto res = receiver_->receive(timeout);
|
||||||
|
if (res.client_id != 0 && !res.object) {
|
||||||
|
auto lock = impls_mutex_.lock_write().move_as_ok();
|
||||||
|
impls_.erase(res.client_id);
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
static Object execute(Function &&function) {
|
||||||
|
return Td::static_request(std::move(function));
|
||||||
|
}
|
||||||
|
|
||||||
|
~Impl() {
|
||||||
|
for (auto &it : impls_) {
|
||||||
|
it.second->destroy(it.first);
|
||||||
|
}
|
||||||
|
while (!impls_.empty()) {
|
||||||
|
receive(10);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
MultiImplPool pool_;
|
||||||
|
td::RwMutex impls_mutex_;
|
||||||
|
std::unordered_map<ClientId, std::shared_ptr<MultiImpl> > impls_;
|
||||||
|
td::unique_ptr<TdReceiver> receiver_{td::make_unique<TdReceiverTs>()};
|
||||||
};
|
};
|
||||||
|
|
||||||
class Client::Impl final {
|
class Client::Impl final {
|
||||||
public:
|
public:
|
||||||
using OutputQueue = MpscPollableQueue<Client::Response>;
|
|
||||||
Impl() {
|
Impl() {
|
||||||
multi_impl_ = MultiImpl::get();
|
static MultiImplPool pool;
|
||||||
td_id_ = multi_impl_->create_id();
|
multi_impl_ = pool.get();
|
||||||
output_queue_ = std::make_shared<OutputQueue>();
|
receiver_ = make_unique<TdReceiverTs>();
|
||||||
output_queue_->init();
|
td_id_ = multi_impl_->create(*receiver_);
|
||||||
|
|
||||||
class Callback : public TdCallback {
|
|
||||||
public:
|
|
||||||
explicit Callback(std::shared_ptr<OutputQueue> output_queue) : output_queue_(std::move(output_queue)) {
|
|
||||||
}
|
|
||||||
void on_result(std::uint64_t id, td_api::object_ptr<td_api::Object> result) override {
|
|
||||||
output_queue_->writer_put({id, std::move(result)});
|
|
||||||
}
|
|
||||||
void on_error(std::uint64_t id, td_api::object_ptr<td_api::error> error) override {
|
|
||||||
output_queue_->writer_put({id, std::move(error)});
|
|
||||||
}
|
|
||||||
Callback(const Callback &) = delete;
|
|
||||||
Callback &operator=(const Callback &) = delete;
|
|
||||||
Callback(Callback &&) = delete;
|
|
||||||
Callback &operator=(Callback &&) = delete;
|
|
||||||
~Callback() override {
|
|
||||||
output_queue_->writer_put({0, nullptr});
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::shared_ptr<OutputQueue> output_queue_;
|
|
||||||
};
|
|
||||||
|
|
||||||
multi_impl_->create(td_id_, td::make_unique<Callback>(output_queue_));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void send(Client::Request request) {
|
void send(Client::Request request) {
|
||||||
@ -263,18 +426,20 @@ class Client::Impl final {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
multi_impl_->send(td_id_, std::move(request));
|
multi_impl_->send(td_id_, request.id, std::move(request.function));
|
||||||
}
|
}
|
||||||
|
|
||||||
Client::Response receive(double timeout) {
|
Client::Response receive(double timeout) {
|
||||||
VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout;
|
auto res = receiver_->receive(0);
|
||||||
auto is_locked = receive_lock_.exchange(true);
|
|
||||||
CHECK(!is_locked);
|
if (res.client_id != 0 && !res.object) {
|
||||||
auto response = receive_unlocked(timeout);
|
is_closed_ = true;
|
||||||
is_locked = receive_lock_.exchange(false);
|
}
|
||||||
CHECK(is_locked);
|
|
||||||
VLOG(td_requests) << "End to wait for updates, returning object " << response.id << ' ' << response.object.get();
|
Client::Response old_res;
|
||||||
return response;
|
old_res.id = res.id;
|
||||||
|
old_res.object = std::move(res.object);
|
||||||
|
return old_res;
|
||||||
}
|
}
|
||||||
|
|
||||||
Impl(const Impl &) = delete;
|
Impl(const Impl &) = delete;
|
||||||
@ -290,31 +455,10 @@ class Client::Impl final {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<MultiImpl> multi_impl_;
|
std::shared_ptr<MultiImpl> multi_impl_;
|
||||||
|
td::unique_ptr<TdReceiver> receiver_;
|
||||||
|
|
||||||
std::shared_ptr<OutputQueue> output_queue_;
|
|
||||||
int output_queue_ready_cnt_{0};
|
|
||||||
std::atomic<bool> receive_lock_{false};
|
|
||||||
bool is_closed_{false};
|
bool is_closed_{false};
|
||||||
int32 td_id_;
|
int32 td_id_;
|
||||||
|
|
||||||
Client::Response receive_unlocked(double timeout) {
|
|
||||||
if (output_queue_ready_cnt_ == 0) {
|
|
||||||
output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock();
|
|
||||||
}
|
|
||||||
if (output_queue_ready_cnt_ > 0) {
|
|
||||||
output_queue_ready_cnt_--;
|
|
||||||
auto res = output_queue_->reader_get_unsafe();
|
|
||||||
if (res.object == nullptr && res.id == 0) {
|
|
||||||
is_closed_ = true;
|
|
||||||
}
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
if (timeout != 0) {
|
|
||||||
output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
|
|
||||||
return receive_unlocked(0);
|
|
||||||
}
|
|
||||||
return {0, nullptr};
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
@ -342,4 +486,24 @@ Client::~Client() = default;
|
|||||||
Client::Client(Client &&other) = default;
|
Client::Client(Client &&other) = default;
|
||||||
Client &Client::operator=(Client &&other) = default;
|
Client &Client::operator=(Client &&other) = default;
|
||||||
|
|
||||||
|
MultiClient::MultiClient() : impl_(std::make_unique<Impl>()) {
|
||||||
|
}
|
||||||
|
|
||||||
|
MultiClient::ClientId MultiClient::create_client() {
|
||||||
|
return impl_->create_client();
|
||||||
|
}
|
||||||
|
void MultiClient::send(ClientId client_id, RequestId request_id, Function function) {
|
||||||
|
impl_->send(client_id, request_id, std::move(function));
|
||||||
|
}
|
||||||
|
MultiClient::Response MultiClient::receive(double timeout) {
|
||||||
|
return impl_->receive(timeout);
|
||||||
|
}
|
||||||
|
MultiClient::Object MultiClient::execute(Function &&function) {
|
||||||
|
return Impl::execute(std::move(function));
|
||||||
|
}
|
||||||
|
|
||||||
|
MultiClient::~MultiClient() = default;
|
||||||
|
MultiClient::MultiClient(MultiClient &&other) = default;
|
||||||
|
MultiClient &MultiClient::operator=(MultiClient &&other) = default;
|
||||||
|
|
||||||
} // namespace td
|
} // namespace td
|
||||||
|
@ -127,6 +127,37 @@ class Client final {
|
|||||||
Client &operator=(Client &&other);
|
Client &operator=(Client &&other);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
friend class MultiClient;
|
||||||
|
class Impl;
|
||||||
|
std::unique_ptr<Impl> impl_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// --- EXPERIMENTAL ---
|
||||||
|
class MultiClient final {
|
||||||
|
public:
|
||||||
|
MultiClient();
|
||||||
|
|
||||||
|
using ClientId = std::int32_t;
|
||||||
|
using RequestId = std::uint64_t;
|
||||||
|
using Function = td_api::object_ptr<td_api::Function>;
|
||||||
|
using Object = td_api::object_ptr<td_api::Object>;
|
||||||
|
struct Response {
|
||||||
|
ClientId client_id;
|
||||||
|
RequestId id;
|
||||||
|
Object object;
|
||||||
|
};
|
||||||
|
|
||||||
|
ClientId create_client();
|
||||||
|
void send(ClientId client_id, RequestId request_id, Function function);
|
||||||
|
Response receive(double timeout);
|
||||||
|
static Object execute(Function &&function);
|
||||||
|
|
||||||
|
~MultiClient();
|
||||||
|
MultiClient(MultiClient &&other);
|
||||||
|
MultiClient &operator=(MultiClient &&other);
|
||||||
|
|
||||||
|
private:
|
||||||
|
friend class Client;
|
||||||
class Impl;
|
class Impl;
|
||||||
std::unique_ptr<Impl> impl_;
|
std::unique_ptr<Impl> impl_;
|
||||||
};
|
};
|
||||||
|
@ -895,6 +895,31 @@ TEST(Client, Multi) {
|
|||||||
thread.join();
|
thread.join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
TEST(Client, MultiNew) {
|
||||||
|
std::vector<td::thread> threads;
|
||||||
|
td::MultiClient client;
|
||||||
|
int threads_n = 4;
|
||||||
|
int clients_n = 1000;
|
||||||
|
for (int i = 0; i < threads_n; i++) {
|
||||||
|
threads.emplace_back([&] {
|
||||||
|
for (int i = 0; i < clients_n; i++) {
|
||||||
|
auto id = client.create_client();
|
||||||
|
client.send(id, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
for (auto &thread : threads) {
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::set<std::int32_t> ids;
|
||||||
|
while (ids.size() * threads_n * clients_n) {
|
||||||
|
auto event = client.receive(10);
|
||||||
|
if (event.client_id != 0 && event.id == 3) {
|
||||||
|
ids.insert(event.client_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
TEST(PartsManager, hands) {
|
TEST(PartsManager, hands) {
|
||||||
|
Loading…
Reference in New Issue
Block a user