NetQuery: support priority

GitOrigin-RevId: 84290ffa7689364f74140c5b951b95ea4792b68d
This commit is contained in:
Arseny Smirnov 2020-08-17 16:13:18 +03:00
parent 94b78114e0
commit 1f9529520f
4 changed files with 47 additions and 10 deletions

View File

@ -71,7 +71,9 @@ AuthManager::AuthManager(int32 api_id, const string &api_hash, ActorShared<> par
void AuthManager::start_up() { void AuthManager::start_up() {
if (state_ == State::LoggingOut) { if (state_ == State::LoggingOut) {
start_net_query(NetQueryType::LogOut, G()->net_query_creator().create(telegram_api::auth_logOut())); auto query = G()->net_query_creator().create(telegram_api::auth_logOut());
query->set_priority(1);
start_net_query(NetQueryType::LogOut, std::move(query));
} else if (state_ == State::DestroyingKeys) { } else if (state_ == State::DestroyingKeys) {
destroy_auth_keys(); destroy_auth_keys();
} }
@ -364,7 +366,9 @@ void AuthManager::log_out(uint64 query_id) {
LOG(INFO) << "Logging out"; LOG(INFO) << "Logging out";
G()->td_db()->get_binlog_pmc()->set("auth", "logout"); G()->td_db()->get_binlog_pmc()->set("auth", "logout");
update_state(State::LoggingOut); update_state(State::LoggingOut);
start_net_query(NetQueryType::LogOut, G()->net_query_creator().create(telegram_api::auth_logOut())); auto query = G()->net_query_creator().create(telegram_api::auth_logOut());
query->set_priority(1);
start_net_query(NetQueryType::LogOut, std::move(query));
} }
} }

View File

@ -266,6 +266,13 @@ class NetQuery : public TsListNode<NetQueryDebug> {
finish_migrate(cancel_slot_); finish_migrate(cancel_slot_);
} }
int8 priority() const {
return priority_;
}
void set_priority(int8 priority) {
priority_ = priority;
}
private: private:
State state_ = State::Empty; State state_ = State::Empty;
Type type_ = Type::Common; Type type_ = Type::Common;
@ -284,6 +291,7 @@ class NetQuery : public TsListNode<NetQueryDebug> {
uint32 session_rand_ = 0; uint32 session_rand_ = 0;
bool may_be_lost_ = false; bool may_be_lost_ = false;
int8 priority_{0};
template <class T> template <class T>
struct movable_atomic : public std::atomic<T> { struct movable_atomic : public std::atomic<T> {

View File

@ -117,6 +117,23 @@ class GenAuthKeyActor : public Actor {
} // namespace detail } // namespace detail
void Session::PriorityQueue::push(NetQueryPtr query) {
queries_[query->priority()].push(std::move(query));
}
NetQueryPtr Session::PriorityQueue::pop() {
auto it = prev(end(queries_));
auto res = it->second.pop();
if (it->second.empty()) {
queries_.erase(it);
}
return res;
}
bool Session::PriorityQueue::empty() const {
return queries_.empty();
}
Session::Session(unique_ptr<Callback> callback, std::shared_ptr<AuthDataShared> shared_auth_data, int32 raw_dc_id, Session::Session(unique_ptr<Callback> callback, std::shared_ptr<AuthDataShared> shared_auth_data, int32 raw_dc_id,
int32 dc_id, bool is_main, bool use_pfs, bool is_cdn, bool need_destroy, int32 dc_id, bool is_main, bool use_pfs, bool is_cdn, bool need_destroy,
const mtproto::AuthKey &tmp_auth_key, std::vector<mtproto::ServerSalt> server_salts) const mtproto::AuthKey &tmp_auth_key, std::vector<mtproto::ServerSalt> server_salts)
@ -344,7 +361,7 @@ void Session::return_query(NetQueryPtr &&query) {
void Session::flush_pending_invoke_after_queries() { void Session::flush_pending_invoke_after_queries() {
while (!pending_invoke_after_queries_.empty()) { while (!pending_invoke_after_queries_.empty()) {
auto &query = pending_invoke_after_queries_.front(); auto &query = pending_invoke_after_queries_.front();
pending_queries_.push_back(std::move(query)); pending_queries_.push(std::move(query));
pending_invoke_after_queries_.pop_front(); pending_invoke_after_queries_.pop_front();
} }
} }
@ -359,7 +376,7 @@ void Session::close() {
auto &query = it.second.query; auto &query = it.second.query;
query->set_message_id(0); query->set_message_id(0);
query->cancel_slot_.clear_event(); query->cancel_slot_.clear_event();
pending_queries_.push_back(std::move(query)); pending_queries_.push(std::move(query));
} }
sent_queries_.clear(); sent_queries_.clear();
sent_containers_.clear(); sent_containers_.clear();
@ -367,10 +384,9 @@ void Session::close() {
flush_pending_invoke_after_queries(); flush_pending_invoke_after_queries();
CHECK(sent_queries_.empty()); CHECK(sent_queries_.empty());
while (!pending_queries_.empty()) { while (!pending_queries_.empty()) {
auto &query = pending_queries_.front(); auto query = pending_queries_.pop();
query->set_error_resend(); query->set_error_resend();
return_query(std::move(query)); return_query(std::move(query));
pending_queries_.pop_front();
} }
callback_->on_closed(); callback_->on_closed();
@ -905,7 +921,7 @@ void Session::add_query(NetQueryPtr &&net_query) {
net_query->debug("Session: pending"); net_query->debug("Session: pending");
LOG_IF(FATAL, UniqueId::extract_type(net_query->id()) == UniqueId::BindKey) LOG_IF(FATAL, UniqueId::extract_type(net_query->id()) == UniqueId::BindKey)
<< "Add BindKey query inpo pending_queries_"; << "Add BindKey query inpo pending_queries_";
pending_queries_.emplace_back(std::move(net_query)); pending_queries_.push(std::move(net_query));
} }
void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_query, uint64 message_id) { void Session::connection_send_query(ConnectionInfo *info, NetQueryPtr &&net_query, uint64 message_id) {
@ -1323,9 +1339,8 @@ void Session::loop() {
if (auth_data_.is_ready(Time::now_cached())) { if (auth_data_.is_ready(Time::now_cached())) {
if (need_send_query()) { if (need_send_query()) {
while (!pending_queries_.empty() && sent_queries_.size() < MAX_INFLIGHT_QUERIES) { while (!pending_queries_.empty() && sent_queries_.size() < MAX_INFLIGHT_QUERIES) {
auto &query = pending_queries_.front(); auto query = pending_queries_.pop();
connection_send_query(&main_connection_, std::move(query)); connection_send_query(&main_connection_, std::move(query));
pending_queries_.pop_front();
need_flush = true; need_flush = true;
} }
} }

View File

@ -25,6 +25,7 @@
#include "td/utils/List.h" #include "td/utils/List.h"
#include "td/utils/Status.h" #include "td/utils/Status.h"
#include "td/utils/StringBuilder.h" #include "td/utils/StringBuilder.h"
#include "td/utils/VectorQueue.h"
#include <array> #include <array>
#include <deque> #include <deque>
@ -124,7 +125,16 @@ class Session final
// Do not invalidate iterators of these two containers! // Do not invalidate iterators of these two containers!
// TODO: better data structures // TODO: better data structures
std::deque<NetQueryPtr> pending_queries_; struct PriorityQueue {
public:
void push(NetQueryPtr query);
NetQueryPtr pop();
bool empty() const;
private:
std::map<int8, VectorQueue<NetQueryPtr>> queries_;
};
PriorityQueue pending_queries_;
std::map<uint64, Query> sent_queries_; std::map<uint64, Query> sent_queries_;
std::deque<NetQueryPtr> pending_invoke_after_queries_; std::deque<NetQueryPtr> pending_invoke_after_queries_;
ListNode sent_queries_list_; ListNode sent_queries_list_;